From aab07ab7821df1266e197f5c37fa93e2c4889942 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Wed, 12 Feb 2025 15:55:33 +0800 Subject: [PATCH] [fix](jdbc catalog) Change BE jdbc Driver loading to Java code (#46912) Related PR: #45806 Problem Summary: In the previous BE processing of JDBC Driver, the Driver jar will be downloaded to the local cache directory first, and the cached jar package will be provided to the JVM for loading. This will cause two problems 1. The jar package in the cache may fail to load due to duplication 2. Frequent repeated loading of the driver by the JVM will cause Compressed class space OOM In order to fix these two problems, this PR has the following changes 1. Remove the logic of BE downloading the Driver Jar to the local cache directory, and directly hand over the original path to Java's Classloader for processing 2. Treat the jar packages with the same name in the same path as one and cache them in the map to avoid repeated loading and cause Compressed class space OOM --- be/src/runtime/user_function_cache.cpp | 20 ++---- be/src/runtime/user_function_cache.h | 1 - be/src/vec/exec/vjdbc_connector.cpp | 29 +++----- be/src/vec/exec/vjdbc_connector.h | 2 + .../apache/doris/jdbc/BaseJdbcExecutor.java | 70 ++++++++++++++++++- .../doris/jdbc/JdbcDataSourceConfig.java | 10 +++ .../apache/doris/catalog/JdbcResource.java | 7 +- .../doris/catalog/JdbcResourceTest.java | 51 ++++++++++++++ gensrc/thrift/Types.thrift | 1 + 9 files changed, 153 insertions(+), 38 deletions(-) diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index ab9d90846abbea..a5f354180b8e72 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -272,12 +272,10 @@ Status UserFunctionCache::_download_lib(const std::string& url, return Status::InternalError("fail to open file"); } - std::string real_url = _get_real_url(url); - Md5Digest digest; HttpClient client; int64_t file_size = 0; - RETURN_IF_ERROR(client.init(real_url)); + RETURN_IF_ERROR(client.init(url)); Status status; auto download_cb = [&status, &tmp_file, &fp, &digest, &file_size](const void* data, size_t length) { @@ -297,11 +295,10 @@ Status UserFunctionCache::_download_lib(const std::string& url, digest.digest(); if (!iequal(digest.hex(), entry->checksum)) { fmt::memory_buffer error_msg; - fmt::format_to( - error_msg, - " The checksum is not equal of {} ({}). The init info of first create entry is:" - "{} But download file check_sum is: {}, file_size is: {}.", - url, real_url, entry->debug_string(), digest.hex(), file_size); + fmt::format_to(error_msg, + " The checksum is not equal of {}. The init info of first create entry is:" + "{} But download file check_sum is: {}, file_size is: {}.", + url, entry->debug_string(), digest.hex(), file_size); std::string error(fmt::to_string(error_msg)); LOG(WARNING) << error; return Status::InternalError(error); @@ -323,13 +320,6 @@ Status UserFunctionCache::_download_lib(const std::string& url, return Status::OK(); } -std::string UserFunctionCache::_get_real_url(const std::string& url) { - if (url.find(":/") == std::string::npos) { - return "file://" + config::jdbc_drivers_dir + "/" + url; - } - return url; -} - std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) const { std::string file_name; size_t last_slash_pos = url.find_last_of('/'); diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h index 5d1bff8b8664b7..93759c261e2b85 100644 --- a/be/src/runtime/user_function_cache.h +++ b/be/src/runtime/user_function_cache.h @@ -72,7 +72,6 @@ class UserFunctionCache { const std::string& file_name); void _destroy_cache_entry(std::shared_ptr entry); - std::string _get_real_url(const std::string& url); std::string _get_file_name_from_url(const std::string& url) const; std::vector _split_string_by_checksum(const std::string& file); diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 0fa33bfaad917d..203e793408435d 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -119,23 +119,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { // Add a scoped cleanup jni reference object. This cleans up local refs made below. JniLocalFrame jni_frame; { - std::string local_location; - std::hash hash_str; - auto* function_cache = UserFunctionCache::instance(); - if (_conn_param.resource_name.empty()) { - // for jdbcExternalTable, _conn_param.resource_name == "" - // so, we use _conn_param.driver_path as key of jarpath - SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer); - RETURN_IF_ERROR(function_cache->get_jarpath( - std::abs((int64_t)hash_str(_conn_param.driver_path)), _conn_param.driver_path, - _conn_param.driver_checksum, &local_location)); - } else { - SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer); - RETURN_IF_ERROR(function_cache->get_jarpath( - std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path, - _conn_param.driver_checksum, &local_location)); - } - VLOG_QUERY << "driver local path = " << local_location; + std::string driver_path = _get_real_url(_conn_param.driver_path); TJdbcExecutorCtorParams ctor_params; ctor_params.__set_statement(_sql_str); @@ -144,7 +128,8 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { ctor_params.__set_jdbc_user(_conn_param.user); ctor_params.__set_jdbc_password(_conn_param.passwd); ctor_params.__set_jdbc_driver_class(_conn_param.driver_class); - ctor_params.__set_driver_path(local_location); + ctor_params.__set_driver_path(driver_path); + ctor_params.__set_jdbc_driver_checksum(_conn_param.driver_checksum); if (state == nullptr) { ctor_params.__set_batch_size(read ? 1 : 0); } else { @@ -601,4 +586,12 @@ jobject JdbcConnector::_get_java_table_type(JNIEnv* env, TOdbcTableType::type ta env->CallStaticObjectMethod(enumClass, findByValueMethod, static_cast(tableType)); return javaEnumObj; } + +std::string JdbcConnector::_get_real_url(const std::string& url) { + if (url.find(":/") == std::string::npos) { + return "file://" + config::jdbc_drivers_dir + "/" + url; + } + return url; +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index 954b0abfa78f0c..c23dc11c86598e 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -134,6 +134,8 @@ class JdbcConnector : public TableConnector { int rows); jobject _get_java_table_type(JNIEnv* env, TOdbcTableType::type tableType); + std::string _get_real_url(const std::string& url); + bool _closed = false; jclass _executor_factory_clazz; jclass _executor_clazz; diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java index 4a54ac2d4ce003..7a6263e5232ead 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java @@ -19,7 +19,6 @@ import org.apache.doris.cloud.security.SecurityChecker; import org.apache.doris.common.exception.InternalException; -import org.apache.doris.common.jni.utils.UdfUtils; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ColumnValueConverter; import org.apache.doris.common.jni.vec.VectorColumn; @@ -28,7 +27,9 @@ import org.apache.doris.thrift.TJdbcOperation; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.zaxxer.hikari.HikariDataSource; +import org.apache.commons.codec.binary.Hex; import org.apache.log4j.Logger; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -36,8 +37,15 @@ import org.semver4j.Semver; import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Array; import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.net.URLConnection; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.Date; @@ -58,6 +66,7 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor { private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); private HikariDataSource hikariDataSource = null; private final byte[] hikariDataSourceLock = new byte[0]; + private ClassLoader classLoader = null; private Connection conn = null; protected JdbcDataSourceConfig config; protected PreparedStatement preparedStatement = null; @@ -69,6 +78,7 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor { protected int batchSizeNum = 0; protected int curBlockRows = 0; protected String jdbcDriverVersion; + private static final Map classLoaderMap = Maps.newConcurrentMap(); public BaseJdbcExecutor(byte[] thriftParams) throws Exception { setJdbcDriverSystemProperties(); @@ -86,6 +96,7 @@ public BaseJdbcExecutor(byte[] thriftParams) throws Exception { .setJdbcUrl(request.jdbc_url) .setJdbcDriverUrl(request.driver_path) .setJdbcDriverClass(request.jdbc_driver_class) + .setJdbcDriverChecksum(request.jdbc_driver_checksum) .setBatchSize(request.batch_size) .setOp(request.op) .setTableType(request.table_type) @@ -299,8 +310,7 @@ private void init(JdbcDataSourceConfig config, String sql) throws JdbcExecutorEx ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader(); String hikariDataSourceKey = config.createCacheKey(); try { - ClassLoader parent = getClass().getClassLoader(); - ClassLoader classLoader = UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent); + initializeClassLoader(config); Thread.currentThread().setContextClassLoader(classLoader); hikariDataSource = JdbcDataSource.getDataSource().getSource(hikariDataSourceKey); if (hikariDataSource == null) { @@ -358,6 +368,60 @@ private void init(JdbcDataSourceConfig config, String sql) throws JdbcExecutorEx } } + private synchronized void initializeClassLoader(JdbcDataSourceConfig config) + throws MalformedURLException, FileNotFoundException { + try { + URL[] urls = {new URL(config.getJdbcDriverUrl())}; + if (classLoaderMap.containsKey(urls[0])) { + this.classLoader = classLoaderMap.get(urls[0]); + } else { + String expectedChecksum = config.getJdbcDriverChecksum(); + String actualChecksum = computeObjectChecksum(urls[0].toString(), null); + if (!expectedChecksum.equals(actualChecksum)) { + throw new RuntimeException("Checksum mismatch for JDBC driver."); + } + ClassLoader parent = getClass().getClassLoader(); + this.classLoader = URLClassLoader.newInstance(urls, parent); + classLoaderMap.put(urls[0], this.classLoader); + } + } catch (MalformedURLException e) { + throw new RuntimeException("Error loading JDBC driver.", e); + } + } + + public static String computeObjectChecksum(String urlStr, String encodedAuthInfo) { + try (InputStream inputStream = getInputStreamFromUrl(urlStr, encodedAuthInfo, 10000, 10000)) { + MessageDigest digest = MessageDigest.getInstance("MD5"); + byte[] buf = new byte[4096]; + int bytesRead; + while ((bytesRead = inputStream.read(buf)) != -1) { + digest.update(buf, 0, bytesRead); + } + return Hex.encodeHexString(digest.digest()); + } catch (IOException | NoSuchAlgorithmException e) { + throw new RuntimeException("Compute driver checksum from url: " + urlStr + + " encountered an error: " + e.getMessage()); + } + } + + public static InputStream getInputStreamFromUrl(String urlStr, String encodedAuthInfo, int connectTimeoutMs, + int readTimeoutMs) throws IOException { + try { + URL url = new URL(urlStr); + URLConnection conn = url.openConnection(); + + if (encodedAuthInfo != null) { + conn.setRequestProperty("Authorization", "Basic " + encodedAuthInfo); + } + + conn.setConnectTimeout(connectTimeoutMs); + conn.setReadTimeout(readTimeoutMs); + return conn.getInputStream(); + } catch (Exception e) { + throw new IOException("Failed to open URL connection: " + urlStr, e); + } + } + protected void setValidationQuery(HikariDataSource ds) { ds.setConnectionTestQuery("SELECT 1"); } diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java index a99377add2532d..87bb9849fe04ff 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java @@ -27,6 +27,7 @@ public class JdbcDataSourceConfig { private String jdbcPassword; private String jdbcDriverUrl; private String jdbcDriverClass; + private String jdbcDriverChecksum; private int batchSize; private TJdbcOperation op; private TOdbcTableType tableType; @@ -96,6 +97,15 @@ public JdbcDataSourceConfig setJdbcDriverClass(String jdbcDriverClass) { return this; } + public String getJdbcDriverChecksum() { + return jdbcDriverChecksum; + } + + public JdbcDataSourceConfig setJdbcDriverChecksum(String jdbcDriverChecksum) { + this.jdbcDriverChecksum = jdbcDriverChecksum; + return this; + } + public int getBatchSize() { return batchSize; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 28d58b35297ac3..f189d1e428f77a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -288,6 +288,12 @@ private static void checkCloudWhiteList(String driverUrl) throws IllegalArgument } public static String getFullDriverUrl(String driverUrl) throws IllegalArgumentException { + if (!(driverUrl.startsWith("file://") || driverUrl.startsWith("http://") + || driverUrl.startsWith("https://") || driverUrl.matches("^[^:/]+\\.jar$"))) { + throw new IllegalArgumentException("Invalid driver URL format. Supported formats are: " + + "file://xxx.jar, http://xxx.jar, https://xxx.jar, or xxx.jar (without prefix)."); + } + try { URI uri = new URI(driverUrl); String schema = uri.getScheme(); @@ -489,4 +495,3 @@ public static void checkConnectionPoolProperties(int minSize, int maxSize, int m } } } - diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java index 81c2157686ab19..3cd8f872ce9494 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java @@ -35,6 +35,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import java.util.Map; @@ -216,4 +217,54 @@ public void testJdbcDriverPtah() { }); Assert.assertEquals("Driver URL does not match any allowed paths: file:///postgresql-42.5.0.jar", exception.getMessage()); } + + @Test + public void testValidDriverUrls() { + String fileUrl = "file://path/to/driver.jar"; + Assertions.assertDoesNotThrow(() -> { + String result = JdbcResource.getFullDriverUrl(fileUrl); + Assert.assertEquals(fileUrl, result); + }); + + String httpUrl = "http://example.com/driver.jar"; + Assertions.assertDoesNotThrow(() -> { + String result = JdbcResource.getFullDriverUrl(httpUrl); + Assert.assertEquals(httpUrl, result); + }); + + String httpsUrl = "https://example.com/driver.jar"; + Assertions.assertDoesNotThrow(() -> { + String result = JdbcResource.getFullDriverUrl(httpsUrl); + Assert.assertEquals(httpsUrl, result); + }); + + String jarFile = "driver.jar"; + Assertions.assertDoesNotThrow(() -> { + String result = JdbcResource.getFullDriverUrl(jarFile); + Assert.assertTrue(result.startsWith("file://")); + }); + } + + @Test + public void testInvalidDriverUrls() { + String invalidUrl1 = "/mnt/path/to/driver.jar"; + Assert.assertThrows(IllegalArgumentException.class, () -> { + JdbcResource.getFullDriverUrl(invalidUrl1); + }); + + String invalidUrl2 = "ftp://example.com/driver.jar"; + Assert.assertThrows(IllegalArgumentException.class, () -> { + JdbcResource.getFullDriverUrl(invalidUrl2); + }); + + String invalidUrl3 = ""; + Assert.assertThrows(IllegalArgumentException.class, () -> { + JdbcResource.getFullDriverUrl(invalidUrl3); + }); + + String invalidUrl4 = "example.com/driver"; + Assert.assertThrows(IllegalArgumentException.class, () -> { + JdbcResource.getFullDriverUrl(invalidUrl4); + }); + } } diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 35a78e068de367..dbb385d0d1d572 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -454,6 +454,7 @@ struct TJdbcExecutorCtorParams { 14: optional i32 connection_pool_cache_clear_time 15: optional bool connection_pool_keep_alive 16: optional i64 catalog_id + 17: optional string jdbc_driver_checksum } struct TJavaUdfExecutorCtorParams {