diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index deb3be5d3a64..d1a88ef4f839 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -97,4 +97,10 @@ public class RESTCatalogOptions { .noDefaultValue() .withDescription( "The user agent of http client connecting to REST Catalog server."); + + public static final ConfigOption DLF_OSS_ENDPOINT = + ConfigOptions.key("dlf.oss-endpoint") + .stringType() + .noDefaultValue() + .withDescription("REST Catalog DLF OSS endpoint."); } diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTUtil.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTUtil.java index 52a633b62a61..5872af209154 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTUtil.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTUtil.java @@ -64,21 +64,33 @@ public static Map extractPrefixMap( return result; } + /** + * Merges two string maps with override properties taking precedence over base properties. + * + *

This method combines two maps of string key-value pairs, where the override map's values + * will override any conflicting keys from the base map. Only non-null values are included in + * the final result. + */ public static Map merge( - Map targets, Map updates) { - if (targets == null) { - targets = Maps.newHashMap(); + Map baseProperties, Map overrideProperties) { + if (overrideProperties == null) { + overrideProperties = Maps.newHashMap(); } - if (updates == null) { - updates = Maps.newHashMap(); + if (baseProperties == null) { + baseProperties = Maps.newHashMap(); } + ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Map.Entry entry : targets.entrySet()) { - if (!updates.containsKey(entry.getKey()) && entry.getValue() != null) { + + // First, add all non-null entries from baseProperties that are not in overrideProperties + for (Map.Entry entry : baseProperties.entrySet()) { + if (entry.getValue() != null && !overrideProperties.containsKey(entry.getKey())) { builder.put(entry.getKey(), entry.getValue()); } } - for (Map.Entry entry : updates.entrySet()) { + + // Then, add all non-null entries from overrideProperties (these take precedence) + for (Map.Entry entry : overrideProperties.entrySet()) { if (entry.getValue() != null) { builder.put(entry.getKey(), entry.getValue()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTUtilTest.java b/paimon-api/src/test/java/org/apache/paimon/utils/RESTUtilTest.java similarity index 74% rename from paimon-core/src/test/java/org/apache/paimon/rest/RESTUtilTest.java rename to paimon-api/src/test/java/org/apache/paimon/utils/RESTUtilTest.java index 025610ac5008..86849f9f62dc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTUtilTest.java +++ b/paimon-api/src/test/java/org/apache/paimon/utils/RESTUtilTest.java @@ -16,7 +16,9 @@ * limitations under the License. */ -package org.apache.paimon.rest; +package org.apache.paimon.utils; + +import org.apache.paimon.rest.RESTUtil; import org.junit.jupiter.api.Test; @@ -29,6 +31,7 @@ public class RESTUtilTest { @Test public void testMerge() { + // Test case 1: updates has precedence over targets for existing keys { Map targets = new HashMap<>(); targets.put("key1", "default1"); @@ -37,8 +40,11 @@ public void testMerge() { updates.put("key2", "update2"); Map result = RESTUtil.merge(targets, updates); assertEquals(result.get("key1"), "default1"); + // key2 should be overridden by updates value assertEquals(result.get("key2"), "update2"); } + + // Test case 2: updates has precedence even when targets has same value { Map targets = new HashMap<>(); targets.put("key1", "default1"); @@ -48,8 +54,11 @@ public void testMerge() { updates.put("key2", "update2"); Map result = RESTUtil.merge(targets, updates); assertEquals(result.get("key1"), "default1"); + // key2 should be overridden by updates value assertEquals(result.get("key2"), "update2"); } + + // Test case 3: empty updates, targets unchanged { Map targets = new HashMap<>(); targets.put("key1", "default1"); @@ -59,6 +68,8 @@ public void testMerge() { assertEquals(result.get("key1"), "default1"); assertEquals(result.get("key2"), "default2"); } + + // Test case 4: empty targets, updates are added { Map targets = new HashMap<>(); Map updates = new HashMap<>(); @@ -66,18 +77,24 @@ public void testMerge() { Map result = RESTUtil.merge(targets, updates); assertEquals(result.get("key2"), "update2"); } + + // Test case 5: both empty { Map targets = new HashMap<>(); Map updates = new HashMap<>(); Map result = RESTUtil.merge(targets, updates); assertEquals(result.size(), 0); } + + // Test case 6: both null { Map targets = null; Map updates = null; Map result = RESTUtil.merge(targets, updates); assertEquals(result.size(), 0); } + + // Test case 7: null values are ignored { Map targets = new HashMap<>(); targets.put("key3", null); @@ -86,5 +103,19 @@ public void testMerge() { Map result = RESTUtil.merge(targets, updates); assertEquals(result.size(), 0); } + + // Test case 8: updates adds new keys that don't exist in targets + { + Map targets = new HashMap<>(); + targets.put("key1", "default1"); + Map updates = new HashMap<>(); + updates.put("key2", "update2"); + updates.put("key3", "update3"); + Map result = RESTUtil.merge(targets, updates); + assertEquals(result.get("key1"), "default1"); + assertEquals(result.get("key2"), "update2"); + assertEquals(result.get("key3"), "update3"); + assertEquals(result.size(), 3); + } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index fc4134c8b8fa..02fa24e1a642 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -35,17 +35,21 @@ import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE; import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; /** A {@link FileIO} to support getting token from REST Server. */ public class RESTTokenFileIO implements FileIO { @@ -162,8 +166,10 @@ public FileIO fileIO() throws IOException { } Options options = catalogContext.options(); - // the original options are not overwritten - options = new Options(RESTUtil.merge(token.token(), options.toMap())); + options = + new Options( + RESTTokenFileIO.mergeTokenWithDlfEndpointHandling( + options.toMap(), token.token())); options.set(FILE_IO_ALLOW_CACHE, false); CatalogContext context = CatalogContext.create( @@ -211,6 +217,33 @@ private void refreshToken() { token = new RESTToken(response.getToken(), response.getExpiresAtMillis()); } + /** + * Merges token properties with catalog properties and handles DLF OSS endpoint configuration. + * + *

This method performs the same merge logic as {@link RESTUtil#merge(Map, Map)} but also + * handles the special case where the DLF OSS endpoint should override the standard OSS + * endpoint. When 'dlf.oss-endpoint' is present in the merged properties, it will be used to set + * 'fs.oss.endpoint' for OSS file system configuration. + * + * @param restTokenProperties the properties from the REST token + * @param catalogProperties the catalog properties to merge with + * @return merged properties with DLF OSS endpoint handling applied + */ + public static Map mergeTokenWithDlfEndpointHandling( + Map catalogProperties, Map restTokenProperties) { + // Use RESTUtil.merge for the basic merge logic + Map result = + Maps.newLinkedHashMap(RESTUtil.merge(catalogProperties, restTokenProperties)); + + // Handle special case: dlf.oss-endpoint should override fs.oss.endpoint + String dlfOssEndpoint = result.get(DLF_OSS_ENDPOINT.key()); + if (dlfOssEndpoint != null && !dlfOssEndpoint.isEmpty()) { + result.put("fs.oss.endpoint", dlfOssEndpoint); + } + + return ImmutableMap.copyOf(result); + } + /** * Public interface to get valid token, this can be invoked by native engines to get the token * and use own File System. diff --git a/paimon-common/src/test/java/org/apache/paimon/rest/RESTTokenFileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/rest/RESTTokenFileIOTest.java new file mode 100644 index 000000000000..a5133546e22a --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/rest/RESTTokenFileIOTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Test for {@link RESTTokenFileIO}. */ +public class RESTTokenFileIOTest { + @Test + public void testMergeTokenWithDlfEndpointHandling() { + // Test case 1: dlf.oss-endpoint overrides fs.oss.endpoint when present and non-empty + { + Map catalogProperties = new HashMap<>(); + catalogProperties.put("fs.oss.endpoint", "original-endpoint"); + catalogProperties.put("other.config", "value1"); + Map restTokenProperties = new HashMap<>(); + restTokenProperties.put(DLF_OSS_ENDPOINT.key(), "new-oss-endpoint"); + restTokenProperties.put("other.config", "value2"); // This should override + Map result = + RESTTokenFileIO.mergeTokenWithDlfEndpointHandling( + catalogProperties, restTokenProperties); + assertEquals("new-oss-endpoint", result.get("fs.oss.endpoint")); + assertEquals( + "value2", result.get("other.config")); // restTokenProperties takes precedence + assertEquals("new-oss-endpoint", result.get(DLF_OSS_ENDPOINT.key())); + assertEquals(3, result.size()); + } + + // Test case 2: dlf.oss-endpoint adds fs.oss.endpoint when not present in catalogProperties + { + Map catalogProperties = new HashMap<>(); + catalogProperties.put("other.config", "value1"); + Map restTokenProperties = new HashMap<>(); + restTokenProperties.put(DLF_OSS_ENDPOINT.key(), "new-oss-endpoint"); + Map result = + RESTTokenFileIO.mergeTokenWithDlfEndpointHandling( + catalogProperties, restTokenProperties); + assertEquals("new-oss-endpoint", result.get("fs.oss.endpoint")); + assertEquals("value1", result.get("other.config")); + assertEquals("new-oss-endpoint", result.get(DLF_OSS_ENDPOINT.key())); + assertEquals(3, result.size()); + } + + // Test case 3: Empty dlf.oss-endpoint should not override + { + Map catalogProperties = new HashMap<>(); + catalogProperties.put("fs.oss.endpoint", "original-endpoint"); + Map restTokenProperties = new HashMap<>(); + restTokenProperties.put(DLF_OSS_ENDPOINT.key(), ""); + Map result = + RESTTokenFileIO.mergeTokenWithDlfEndpointHandling( + catalogProperties, restTokenProperties); + assertEquals("original-endpoint", result.get("fs.oss.endpoint")); + assertEquals( + 2, result.size()); // fs.oss.endpoint and dlf.oss-endpoint (empty string is not + // null) + } + + // Test case 4: Null dlf.oss-endpoint should not override + { + Map catalogProperties = new HashMap<>(); + catalogProperties.put("fs.oss.endpoint", "original-endpoint"); + Map restTokenProperties = new HashMap<>(); + restTokenProperties.put(DLF_OSS_ENDPOINT.key(), null); + Map result = + RESTTokenFileIO.mergeTokenWithDlfEndpointHandling( + catalogProperties, restTokenProperties); + assertEquals("original-endpoint", result.get("fs.oss.endpoint")); + assertEquals(1, result.size()); // Only fs.oss.endpoint (null values are filtered out) + } + + // Test case 5: No dlf.oss-endpoint in restTokenProperties, fs.oss.endpoint unchanged + { + Map catalogProperties = new HashMap<>(); + catalogProperties.put("fs.oss.endpoint", "original-endpoint"); + Map restTokenProperties = new HashMap<>(); + restTokenProperties.put("other.config", "value1"); + Map result = + RESTTokenFileIO.mergeTokenWithDlfEndpointHandling( + catalogProperties, restTokenProperties); + assertEquals("original-endpoint", result.get("fs.oss.endpoint")); + assertEquals("value1", result.get("other.config")); + assertEquals(2, result.size()); + } + + // Test case 6: Both empty maps + { + Map catalogProperties = new HashMap<>(); + Map restTokenProperties = new HashMap<>(); + Map result = + RESTTokenFileIO.mergeTokenWithDlfEndpointHandling( + catalogProperties, restTokenProperties); + assertEquals(0, result.size()); + } + + // Test case 7: Both null maps + { + Map result = + RESTTokenFileIO.mergeTokenWithDlfEndpointHandling(null, null); + assertEquals(0, result.size()); + } + + // Test case 8: restTokenProperties adds new keys that don't exist in catalogProperties + { + Map catalogProperties = new HashMap<>(); + catalogProperties.put("key1", "catalog1"); + Map restTokenProperties = new HashMap<>(); + restTokenProperties.put("key2", "token2"); + restTokenProperties.put("key3", "token3"); + restTokenProperties.put(DLF_OSS_ENDPOINT.key(), "dlf-endpoint"); + Map result = + RESTTokenFileIO.mergeTokenWithDlfEndpointHandling( + catalogProperties, restTokenProperties); + assertEquals("catalog1", result.get("key1")); + assertEquals("token2", result.get("key2")); + assertEquals("token3", result.get("key3")); + assertEquals("dlf-endpoint", result.get(DLF_OSS_ENDPOINT.key())); + assertEquals("dlf-endpoint", result.get("fs.oss.endpoint")); // DLF endpoint handling + assertEquals(5, result.size()); + } + } +}