diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 40335a3195620f..406b7b8fdf2fd6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1187,6 +1187,8 @@ DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128"); /** Hive sink configurations **/ DEFINE_mInt64(hive_sink_max_file_size, "1073741824"); // 1GB +DEFINE_mInt32(thrift_client_open_num_tries, "1"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index b7c0d315c51e65..4d22fbfe52120f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1270,6 +1270,10 @@ DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer); /** Hive sink configurations **/ DECLARE_mInt64(hive_sink_max_file_size); // 1GB +// Number of open tries, default 1 means only try to open once. +// Retry the Open num_retries time waiting 100 milliseconds between retries. +DECLARE_mInt32(thrift_client_open_num_tries); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp index 3da31caf5c8922..ea7b43b6102123 100644 --- a/be/src/runtime/client_cache.cpp +++ b/be/src/runtime/client_cache.cpp @@ -114,7 +114,7 @@ Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport, client_impl->set_conn_timeout(config::thrift_connect_timeout_seconds * 1000); - Status status = client_impl->open(); + Status status = client_impl->open_with_retry(config::thrift_client_open_num_tries, 100); if (!status.ok()) { *client_key = nullptr; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 8a71ca5ba44648..612b73908ff27f 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -107,6 +107,7 @@ class RowCache; class DummyLRUCache; class CacheManager; class WalManager; +class DNSCache; inline bool k_doris_exit = false; @@ -213,6 +214,8 @@ class ExecEnv { FileMetaCache* file_meta_cache() { return _file_meta_cache; } MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); } WalManager* wal_mgr() { return _wal_manager.get(); } + DNSCache* dns_cache() { return _dns_cache; } + #ifdef BE_TEST void set_ready() { this->_s_ready = true; } void set_not_ready() { this->_s_ready = false; } @@ -364,6 +367,7 @@ class ExecEnv { std::unique_ptr _load_stream_stub_pool; std::unique_ptr _delta_writer_v2_pool; std::shared_ptr _wal_manager; + DNSCache* _dns_cache = nullptr; std::mutex _frontends_lock; // ip:brpc_port -> frontend_indo diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 795cecd1337f42..2a734298e20f99 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -91,6 +91,7 @@ #include "util/brpc_client_cache.h" #include "util/cpu_info.h" #include "util/disk_info.h" +#include "util/dns_cache.h" #include "util/doris_metrics.h" #include "util/mem_info.h" #include "util/metrics.h" @@ -243,6 +244,7 @@ Status ExecEnv::_init(const std::vector& store_paths, _delta_writer_v2_pool = std::make_unique(); _file_cache_open_fd_cache = std::make_unique(); _wal_manager = WalManager::create_shared(this, config::group_commit_wal_path); + _dns_cache = new DNSCache(); _spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths); _backend_client_cache->init_metrics("backend"); _frontend_client_cache->init_metrics("frontend"); @@ -578,6 +580,7 @@ void ExecEnv::destroy() { _delta_writer_v2_pool.reset(); _load_stream_stub_pool.reset(); _file_cache_open_fd_cache.reset(); + SAFE_DELETE(_dns_cache); // StorageEngine must be destoried before _page_no_cache_mem_tracker.reset and _cache_manager destory // shouldn't use SAFE_STOP. otherwise will lead to twice stop. diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index ff537850854f9e..7b313d6ae023e5 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -40,6 +40,8 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" +#include "runtime/exec_env.h" +#include "util/dns_cache.h" #include "util/network_util.h" namespace doris { @@ -79,7 +81,7 @@ class BrpcClientCache { std::string realhost; realhost = host; if (!is_valid_ip(host)) { - Status status = hostname_to_ip(host, realhost); + Status status = ExecEnv::GetInstance()->dns_cache()->get(host, &realhost); if (!status.ok()) { LOG(WARNING) << "failed to get ip from host:" << status.to_string(); return nullptr; diff --git a/be/src/util/dns_cache.cpp b/be/src/util/dns_cache.cpp new file mode 100644 index 00000000000000..f2bd4ce91e6070 --- /dev/null +++ b/be/src/util/dns_cache.cpp @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/dns_cache.h" + +#include "service/backend_options.h" +#include "util/network_util.h" + +namespace doris { + +DNSCache::DNSCache() { + refresh_thread = std::thread(&DNSCache::_refresh_cache, this); + refresh_thread.detach(); +} + +DNSCache::~DNSCache() { + stop_refresh = true; + if (refresh_thread.joinable()) { + refresh_thread.join(); + } +} + +Status DNSCache::get(const std::string& hostname, std::string* ip) { + { + std::shared_lock lock(mutex); + auto it = cache.find(hostname); + if (it != cache.end()) { + *ip = it->second; + return Status::OK(); + } + } + // Update if not found + RETURN_IF_ERROR(_update(hostname)); + { + std::shared_lock lock(mutex); + *ip = cache[hostname]; + return Status::OK(); + } +} + +Status DNSCache::_update(const std::string& hostname) { + std::string real_ip = ""; + RETURN_IF_ERROR(hostname_to_ip(hostname, real_ip, BackendOptions::is_bind_ipv6())); + std::unique_lock lock(mutex); + auto it = cache.find(hostname); + if (it == cache.end() || it->second != real_ip) { + cache[hostname] = real_ip; + LOG(INFO) << "update hostname " << hostname << "'s ip to " << real_ip; + } + return Status::OK(); +} + +void DNSCache::_refresh_cache() { + while (!stop_refresh) { + // refresh every 1 min + std::this_thread::sleep_for(std::chrono::minutes(1)); + std::unordered_set keys; + { + std::shared_lock lock(mutex); + std::transform(cache.begin(), cache.end(), std::inserter(keys, keys.end()), + [](const auto& pair) { return pair.first; }); + } + Status st; + for (auto& key : keys) { + st = _update(key); + } + } +} + +} // end of namespace doris diff --git a/be/src/util/dns_cache.h b/be/src/util/dns_cache.h new file mode 100644 index 00000000000000..5dc413c53e2c32 --- /dev/null +++ b/be/src/util/dns_cache.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" + +namespace doris { + +// Same as +// fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java +class DNSCache { +public: + DNSCache(); + ~DNSCache(); + + // get ip by hostname + Status get(const std::string& hostname, std::string* ip); + +private: + // update the ip of hostname in cache + Status _update(const std::string& hostname); + + // a function for refresh daemon thread + // update cache at fix internal + void _refresh_cache(); + +private: + // hostname -> ip + std::unordered_map cache; + mutable std::shared_mutex mutex; + std::thread refresh_thread; + bool stop_refresh = false; +}; + +} // end of namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 1b479f364639b2..d4a3f0865cea84 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -98,6 +98,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.ConfigBase; import org.apache.doris.common.ConfigException; +import org.apache.doris.common.DNSCache; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -527,6 +528,8 @@ public class Env { private InsertOverwriteManager insertOverwriteManager; + private DNSCache dnsCache; + public List getFrontendInfos() { List res = new ArrayList<>(); @@ -762,6 +765,7 @@ public Env(boolean isCheckpointCatalog) { "TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo); this.mtmvService = new MTMVService(); this.insertOverwriteManager = new InsertOverwriteManager(); + this.dnsCache = new DNSCache(); } public static void destroyCheckpoint() { @@ -921,6 +925,10 @@ public static HiveTransactionMgr getCurrentHiveTransactionMgr() { return getCurrentEnv().getHiveTransactionMgr(); } + public DNSCache getDnsCache() { + return dnsCache; + } + // Use tryLock to avoid potential dead lock private boolean tryLock(boolean mustLock) { while (true) { @@ -1692,7 +1700,7 @@ protected void startMasterOnlyDaemonThreads() { insertOverwriteManager.start(); } - // start threads that should running on all FE + // start threads that should run on all FE protected void startNonMasterDaemonThreads() { // start load manager thread loadManager.start(); @@ -1709,6 +1717,8 @@ protected void startNonMasterDaemonThreads() { if (Config.enable_hms_events_incremental_sync) { metastoreEventsProcessor.start(); } + + dnsCache.start(); } private void transferToNonMaster(FrontendNodeType newType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java new file mode 100644 index 00000000000000..1fe96eba20f5f9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.doris.common.util.NetUtils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.net.UnknownHostException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; + +/** + * DNSCache is a class that caches DNS lookups and periodically refreshes them. + * It uses a ConcurrentHashMap to store the hostname to IP address mappings and a ScheduledExecutorService + * to periodically refresh these mappings. + */ +public class DNSCache { + private static final Logger LOG = LogManager.getLogger(DNSCache.class); + + private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = ThreadPoolManager.newDaemonScheduledThreadPool(1, + "dns_cache_pool", true); + + /** + * Check if the enable_fqdn_mode configuration is set. + * If it is, it schedules a task to refresh the DNS cache every 60 seconds, + * starting after an initial delay of 120 seconds. + */ + public void start() { + if (Config.enable_fqdn_mode) { + executor.scheduleAtFixedRate(this::refresh, 120, 60, java.util.concurrent.TimeUnit.SECONDS); + } + } + + /** + * The get method retrieves the IP address for a given hostname from the cache. + * If the hostname is not in the cache, it resolves the hostname to an IP address and stores it in the cache. + * + * @param hostname The hostname for which to get the IP address. + * @return The IP address for the given hostname. + */ + public String get(String hostname) { + return cache.computeIfAbsent(hostname, this::resolveHostname); + } + + /** + * The resolveHostname method resolves a hostname to an IP address. + * If the hostname cannot be resolved, it returns an empty string. + * + * @param hostname The hostname to resolve. + * @return The IP address for the given hostname, or an empty string if the hostname cannot be resolved. + */ + private String resolveHostname(String hostname) { + try { + return NetUtils.getIpByHost(hostname, 0); + } catch (UnknownHostException e) { + return ""; + } + } + + /** + * The refresh method periodically refreshes the DNS cache. + * It iterates over each hostname in the cache, resolves the hostname to an IP address, + * and compares it with the current IP address in the cache. + * If they are different, it updates the cache with the new IP address and logs the change. + */ + private void refresh() { + for (String hostname : cache.keySet()) { + String resolvedHostname = resolveHostname(hostname); + String currentHostname = cache.get(hostname); + if (!resolvedHostname.equals(currentHostname)) { + cache.put(hostname, resolvedHostname); + LOG.info("IP for hostname {} has changed from {} to {}", hostname, currentHostname, + resolvedHostname); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java index 0c1ac130cdea0a..9b787f52bf47a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java @@ -95,9 +95,19 @@ public static String getHostnameByIp(String ip) { return hostName; } - public static String getIpByHost(String host) throws UnknownHostException { - InetAddress inetAddress = InetAddress.getByName(host); - return inetAddress.getHostAddress(); + public static String getIpByHost(String host, int retryTimes) throws UnknownHostException { + InetAddress inetAddress; + while (true) { + try { + inetAddress = InetAddress.getByName(host); + return inetAddress.getHostAddress(); + } catch (UnknownHostException e) { + LOG.warn("Get IP by host failed, hostname: {}, remaining retryTimes: {}", host, retryTimes, e); + if (retryTimes-- <= 0) { + throw e; + } + } + } } // This is the implementation is inspired by Apache camel project: diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index d78e055a1aba29..af21194263f15b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -17,9 +17,9 @@ package org.apache.doris.rpc; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.common.util.NetUtils; import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.proto.InternalService; @@ -112,7 +112,7 @@ public void removeProxy(TNetworkAddress address) { } private BackendServiceClient getProxy(TNetworkAddress address) throws UnknownHostException { - String realIp = NetUtils.getIpByHost(address.getHostname()); + String realIp = Env.getCurrentEnv().getDnsCache().get(address.hostname); BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address); if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp) && serviceClientExtIp.client.isNormalState()) {