From 0e157df6ed613bbe71568f67dbe31750fc7e8d1b Mon Sep 17 00:00:00 2001 From: stalary Date: Sat, 30 Jan 2021 17:17:01 +0800 Subject: [PATCH 01/13] MOD: support fe connect https es --- .../org/apache/doris/catalog/Catalog.java | 2 + .../org/apache/doris/catalog/EsTable.java | 59 +++++++++++++------ .../external/elasticsearch/EsNodeInfo.java | 30 ++++++++++ .../external/elasticsearch/EsRepository.java | 2 +- .../external/elasticsearch/EsRestClient.java | 58 ++++++++++++++++-- .../doris/external/elasticsearch/EsUtil.java | 30 +++++++--- .../elasticsearch/PartitionPhase.java | 11 +++- .../external/elasticsearch/SearchContext.java | 8 +++ gensrc/thrift/Descriptors.thrift | 1 + 9 files changed, 168 insertions(+), 33 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index cd54710f846062..742e4cc392d563 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4150,6 +4150,8 @@ public static void getDdlStmt(String dbName, Table table, List createTab sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n"); sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n"); sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n"); + sb.append("\"es_nodes_discovery\" = \"").append(esTable.isEsNodesDiscovery()).append("\"\n"); + sb.append("\"use_ssl_client\" = \"").append(esTable.isUseSslClient()).append("\"\n"); sb.append(")"); } else if (table.getType() == TableType.HIVE) { HiveTable hiveTable = (HiveTable) table; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index 8cae113f26a073..bf679c19477614 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -24,6 +24,7 @@ import org.apache.doris.external.elasticsearch.EsMetaStateTracker; import org.apache.doris.external.elasticsearch.EsRestClient; import org.apache.doris.external.elasticsearch.EsTablePartitions; +import org.apache.doris.external.elasticsearch.EsUtil; import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -63,6 +64,8 @@ public class EsTable extends Table { public static final String DOC_VALUE_SCAN = "enable_docvalue_scan"; public static final String KEYWORD_SNIFF = "enable_keyword_sniff"; public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields"; + public static final String ES_NODES_DISCOVERY = "es_nodes_discovery"; + public static final String USE_SSL_CLIENT = "use_ssl_client"; private String hosts; private String[] seeds; @@ -87,6 +90,10 @@ public class EsTable extends Table { // would downgrade to extract value from `stored_fields` private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; + private boolean esNodesDiscovery = true; + + private boolean useSslClient = false; + // Solr doc_values vs stored_fields performance-smackdown indicate: // It is possible to notice that retrieving an high number of fields leads // to a sensible worsening of performance if DocValues are used. @@ -138,6 +145,13 @@ public boolean isKeywordSniffEnable() { return enableKeywordSniff; } + public boolean isEsNodesDiscovery() { + return esNodesDiscovery; + } + + public boolean isUseSslClient() { + return useSslClient; + } private void validate(Map properties) throws DdlException { if (properties == null) { @@ -185,36 +199,31 @@ private void validate(Map properties) throws DdlException { // enable doc value scan for Elasticsearch if (properties.containsKey(DOC_VALUE_SCAN)) { - try { - enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim()); - } catch (Exception e) { - throw new DdlException("fail to parse enable_docvalue_scan, enable_docvalue_scan= " - + properties.get(VERSION).trim() + " ,`enable_docvalue_scan`" - + " should be like 'true' or 'false', value should be double quotation marks"); - } + enableDocValueScan = EsUtil.getBooleanProperty(properties, DOC_VALUE_SCAN); } if (properties.containsKey(KEYWORD_SNIFF)) { - try { - enableKeywordSniff = Boolean.parseBoolean(properties.get(KEYWORD_SNIFF).trim()); - } catch (Exception e) { - throw new DdlException("fail to parse enable_keyword_sniff, enable_keyword_sniff= " - + properties.get(VERSION).trim() + " ,`enable_keyword_sniff`" - + " should be like 'true' or 'false', value should be double quotation marks"); - } - } else { - enableKeywordSniff = true; + enableKeywordSniff = EsUtil.getBooleanProperty(properties, KEYWORD_SNIFF); + } + + if (properties.containsKey(ES_NODES_DISCOVERY)) { + esNodesDiscovery = EsUtil.getBooleanProperty(properties, ES_NODES_DISCOVERY); + } + + if (properties.containsKey(USE_SSL_CLIENT)) { + useSslClient = EsUtil.getBooleanProperty(properties, USE_SSL_CLIENT); } if (!Strings.isNullOrEmpty(properties.get(TYPE)) && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) { mappingType = properties.get(TYPE).trim(); } + if (!Strings.isNullOrEmpty(properties.get(TRANSPORT)) && !Strings.isNullOrEmpty(properties.get(TRANSPORT).trim())) { transport = properties.get(TRANSPORT).trim(); if (!(TRANSPORT_HTTP.equals(transport) || TRANSPORT_THRIFT.equals(transport))) { - throw new DdlException("transport of ES table must be http(recommend) or thrift(reserved inner usage)," + throw new DdlException("transport of ES table must be http/https(recommend) or thrift(reserved inner usage)," + " but value is " + transport); } } @@ -241,10 +250,13 @@ private void validate(Map properties) throws DdlException { tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan)); tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff)); tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields)); + tableContext.put(ES_NODES_DISCOVERY, String.valueOf(esNodesDiscovery)); + tableContext.put(USE_SSL_CLIENT, String.valueOf(useSslClient)); } public TTableDescriptor toThrift() { TEsTable tEsTable = new TEsTable(); + tEsTable.setUseSslClient(useSslClient); TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ES_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setEsTable(tEsTable); @@ -323,7 +335,16 @@ public void readFields(DataInput in) throws IOException { maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; } } - + if (tableContext.containsKey(ES_NODES_DISCOVERY)) { + esNodesDiscovery = Boolean.parseBoolean(tableContext.get(ES_NODES_DISCOVERY)); + } else { + esNodesDiscovery = true; + } + if (tableContext.containsKey(USE_SSL_CLIENT)) { + useSslClient = Boolean.parseBoolean(tableContext.get(USE_SSL_CLIENT)); + } else { + useSslClient = false; + } PartitionType partType = PartitionType.valueOf(Text.readString(in)); if (partType == PartitionType.UNPARTITIONED) { partitionInfo = SinglePartitionInfo.read(in); @@ -357,6 +378,8 @@ public void readFields(DataInput in) throws IOException { tableContext.put("transport", transport); tableContext.put("enableDocValueScan", "false"); tableContext.put(KEYWORD_SNIFF, "true"); + tableContext.put(ES_NODES_DISCOVERY, "true"); + tableContext.put(USE_SSL_CLIENT, "false"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java index 73d8daf5b31242..918e240c93e8f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java @@ -19,6 +19,9 @@ import org.apache.doris.thrift.TNetworkAddress; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; import java.util.Map; @@ -38,6 +41,8 @@ public class EsNodeInfo { private boolean hasThrift; private TNetworkAddress thriftAddress; + private static final Logger LOG = LogManager.getLogger(EsNodeInfo.class); + public EsNodeInfo(String id, Map map) throws DorisEsException { this.id = id; EsMajorVersion version = EsMajorVersion.parse((String) map.get("version")); @@ -96,6 +101,31 @@ public EsNodeInfo(String id, Map map) throws DorisEsException { } } + public EsNodeInfo(String id, String seed, boolean useSslClient) throws DorisEsException { + this.id = id; + if (seed.startsWith("http://")) { + seed = seed.substring(7); + } + if (seed.startsWith("https://")) { + seed = seed.substring(8); + } + String[] scratch = seed.split(":"); + int port = 80; + String remoteHost = (useSslClient ? "https://" : "http://") + scratch[0]; + if (scratch.length == 2) { + port = Integer.parseInt(scratch[1]); + } + LOG.info("--------------- remoteHost={}, port={} -------------", remoteHost, port); + this.name = remoteHost; + this.host = remoteHost; + this.ip = remoteHost; + this.isClient = true; + this.isData = true; + this.isIngest = true; + this.publishAddress = new TNetworkAddress(remoteHost, port); + this.hasHttp = true; + } + public boolean hasHttp() { return hasHttp; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java index 98e895e61ca500..16d2c3d78ec43e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java @@ -59,7 +59,7 @@ public void registerTable(EsTable esTable) { } esTables.put(esTable.getId(), esTable); esClients.put(esTable.getId(), - new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd())); + new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd(), esTable.isUseSslClient())); LOG.info("register a new table [{}] to sync list", esTable); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index f2868aa5eedc34..9e36d7973340ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -27,11 +27,20 @@ import org.codehaus.jackson.map.SerializationConfig; import java.io.IOException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import okhttp3.Credentials; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -48,16 +57,14 @@ public class EsRestClient { mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false); } - private static OkHttpClient networkClient = new OkHttpClient.Builder() - .readTimeout(10, TimeUnit.SECONDS) - .build(); + private static OkHttpClient networkClient; private Request.Builder builder; private String[] nodes; private String currentNode; private int currentNodeIndex = 0; - public EsRestClient(String[] nodes, String authUser, String authPassword) { + public EsRestClient(String[] nodes, String authUser, String authPassword, boolean useSslClient) { this.nodes = nodes; this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { @@ -65,6 +72,18 @@ public EsRestClient(String[] nodes, String authUser, String authPassword) { Credentials.basic(authUser, authPassword)); } this.currentNode = nodes[currentNodeIndex]; + if (useSslClient) { + LOG.info("use ssl client"); + networkClient = new OkHttpClient.Builder() + .readTimeout(10, TimeUnit.SECONDS) + .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) + .hostnameVerifier(new TrustAllHostnameVerifier()) + .build(); + } else { + networkClient = new OkHttpClient.Builder() + .readTimeout(10, TimeUnit.SECONDS) + .build(); + } } private void selectNextNode() { @@ -207,4 +226,35 @@ private T parseContent(String response, String key) { } return (T) (key != null ? map.get(key) : map); } + + /** + * support https + **/ + private static class TrustAllCerts implements X509TrustManager { + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + + public X509Certificate[] getAcceptedIssuers() {return new X509Certificate[0];} + } + + private static class TrustAllHostnameVerifier implements HostnameVerifier { + public boolean verify(String hostname, SSLSession session) { + return true; + } + } + + private static SSLSocketFactory createSSLSocketFactory() { + SSLSocketFactory ssfFactory = null; + + try { + SSLContext sc = SSLContext.getInstance("TLS"); + sc.init(null, new TrustManager[]{new TrustAllCerts()}, new SecureRandom()); + + ssfFactory = sc.getSocketFactory(); + } catch (Exception e) { + } + + return ssfFactory; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index d25a01fe7898ae..c55661f2368d9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -21,43 +21,46 @@ import org.apache.doris.analysis.PartitionDesc; import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.json.JSONObject; +import java.util.Map; + public class EsUtil { - + public static void analyzePartitionAndDistributionDesc(PartitionDesc partitionDesc, - DistributionDesc distributionDesc) throws AnalysisException { + DistributionDesc distributionDesc) throws AnalysisException { if (partitionDesc == null && distributionDesc == null) { return; } - + if (partitionDesc != null) { if (!(partitionDesc instanceof RangePartitionDesc)) { throw new AnalysisException("Elasticsearch table only permit range partition"); } - + RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc; analyzePartitionDesc(rangePartitionDesc); } - + if (distributionDesc != null) { throw new AnalysisException("could not support distribution clause"); } } - + private static void analyzePartitionDesc(RangePartitionDesc partDesc) throws AnalysisException { if (partDesc.getPartitionColNames() == null || partDesc.getPartitionColNames().isEmpty()) { throw new AnalysisException("No partition columns."); } - + if (partDesc.getPartitionColNames().size() > 1) { throw new AnalysisException( "Elasticsearch table's partition column could only be a single column"); } } - - + + /** * get the json object from specified jsonObject * @@ -82,4 +85,13 @@ public static JSONObject getJsonObject(JSONObject jsonObject, String key, int fr return null; } } + + public static boolean getBooleanProperty(Map properties, String name) throws DdlException { + String property = properties.get(name).trim(); + try { + return Boolean.parseBoolean(property); + } catch (Exception e) { + throw new DdlException(String.format("fail to parse %s, %s = %s, `%s` should be like 'true' or 'false', value should be double quotation marks", name, name, property, name)); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java index de1bb76d60667f..6e905b6d690f3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.EsTable; +import java.util.HashMap; import java.util.Map; /** @@ -37,7 +38,15 @@ public PartitionPhase(EsRestClient client) { @Override public void execute(SearchContext context) throws DorisEsException { shardPartitions = client.searchShards(context.sourceIndex()); - nodesInfo = client.getHttpNodes(); + if (context.esNodesDiscovery()) { + nodesInfo = client.getHttpNodes(); + } else { + nodesInfo = new HashMap<>(); + String[] seeds = context.esTable().getSeeds(); + for (int i = 0; i < seeds.length; i++) { + nodesInfo.put(String.valueOf(i), new EsNodeInfo(String.valueOf(i), seeds[i], context.esTable().isUseSslClient())); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java index 06b4c7dde8f12a..f306415ec61b33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java @@ -84,12 +84,16 @@ public class SearchContext { // the ES cluster version private EsMajorVersion version; + // whether the nodes needs to be discovered + private boolean esNodesDiscovery; + public SearchContext(EsTable table) { this.table = table; fullSchema = table.getFullSchema(); sourceIndex = table.getIndexName(); type = table.getMappingType(); + esNodesDiscovery = table.isEsNodesDiscovery(); } @@ -142,4 +146,8 @@ public EsShardPartitions partitions() { public EsTablePartitions tablePartitions() throws Exception { return EsTablePartitions.fromShardPartitions(table, shardPartitions); } + + public boolean esNodesDiscovery() { + return esNodesDiscovery; + } } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index ba5de789d90b71..c914dfa1843b0d 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -227,6 +227,7 @@ struct TOdbcTable { } struct TEsTable { + 1: required bool useSslClient } struct TSchemaTable { From d019eb9252c6f8f1ca15805beb43b0b19f0f8faf Mon Sep 17 00:00:00 2001 From: stalary Date: Sun, 21 Feb 2021 23:15:01 +0800 Subject: [PATCH 02/13] MOD: add be ssl switch --- be/src/exec/es/es_scan_reader.cpp | 12 ++++++++++++ be/src/exec/es/es_scan_reader.h | 3 +++ be/src/http/http_client.h | 5 +++++ .../java/org/apache/doris/planner/EsScanNode.java | 1 + 4 files changed, 21 insertions(+) diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp index 7dc38091df3d4f..99d63d5da94a96 100644 --- a/be/src/exec/es/es_scan_reader.cpp +++ b/be/src/exec/es/es_scan_reader.cpp @@ -60,6 +60,9 @@ ESScanReader::ESScanReader(const std::string& target, if (props.find(KEY_QUERY) != props.end()) { _query = props.at(KEY_QUERY); } + if (props.find(KEY_USE_SSL_CLIENT) != props.end()) { + _use_ssl_client = props.at(KEY_USE_SSL_CLIENT); + } std::string batch_size_str = props.at(KEY_BATCH_SIZE); _batch_size = atoi(batch_size_str.c_str()); @@ -103,6 +106,9 @@ Status ESScanReader::open() { } _network_client.set_basic_auth(_user_name, _passwd); _network_client.set_content_type("application/json"); + if (_use_ssl_client == "true") { + _network_client.set_ssl(); + } // phase open, we cached the first response for `get_next` phase Status status = _network_client.execute_post_request(_query, &_cached_response); if (!status.ok() || _network_client.get_http_status() != 200) { @@ -134,6 +140,9 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr& scr _network_client.set_basic_auth(_user_name, _passwd); _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(_http_timeout_ms); + if (_use_ssl_client == "true") { + _network_client.set_ssl(); + } RETURN_IF_ERROR(_network_client.execute_post_request( ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive), &response)); @@ -188,6 +197,9 @@ Status ESScanReader::close() { _network_client.set_method(DELETE); _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(5 * 1000); + if (_use_ssl_client == "true") { + _network_client.set_ssl(); + } std::string response; RETURN_IF_ERROR(_network_client.execute_delete_request( ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response)); diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h index 6a1f9d4a41066c..f12d9ed2f8026c 100644 --- a/be/src/exec/es/es_scan_reader.h +++ b/be/src/exec/es/es_scan_reader.h @@ -40,6 +40,7 @@ class ESScanReader { static constexpr const char* KEY_BATCH_SIZE = "batch_size"; static constexpr const char* KEY_TERMINATE_AFTER = "limit"; static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode"; + static constexpr const char* KEY_USE_SSL_CLIENT = "use_ssl_client"; ESScanReader(const std::string& target, const std::map& props, bool doc_value_mode); ~ESScanReader(); @@ -63,6 +64,8 @@ class ESScanReader { std::string _query; // Elasticsearch shards to fetch document std::string _shards; + // whether use ssl client + std::string _use_ssl_client; // distinguish the first scroll phase and the following scroll bool _is_first; diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index cdbe1435c8cd93..b6ebab198c7b56 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -66,6 +66,11 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str()); } + void set_ssl() { + curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYPEER, false); + curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, false); + } + // TODO(zc): support set header // void set_header(const std::string& key, const std::string& value) { // _cntl.http_request().SetHeader(key, value); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java index d075946a3d781e..4662a887b0c5d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -148,6 +148,7 @@ protected void toThrift(TPlanNode msg) { Map properties = Maps.newHashMap(); properties.put(EsTable.USER, table.getUserName()); properties.put(EsTable.PASSWORD, table.getPasswd()); + properties.put(EsTable.USE_SSL_CLIENT, String.valueOf(table.isUseSslClient())); TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt()); esScanNode.setProperties(properties); if (table.isDocValueScanEnable()) { From 343710221f4cfa07028d06d49c3eeb60b44d7bdd Mon Sep 17 00:00:00 2001 From: stalary Date: Tue, 23 Feb 2021 22:39:21 +0800 Subject: [PATCH 03/13] MOD: modify some code --- be/src/exec/es/es_scan_reader.cpp | 14 +++++++------- be/src/exec/es/es_scan_reader.h | 2 +- be/src/http/http_client.h | 2 +- .../java/org/apache/doris/catalog/EsTable.java | 10 +++++++++- .../external/elasticsearch/EsNodeInfo.java | 18 +++++++----------- .../external/elasticsearch/EsRestClient.java | 7 ++----- gensrc/thrift/Descriptors.thrift | 1 - 7 files changed, 27 insertions(+), 27 deletions(-) diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp index 99d63d5da94a96..bebb9c1a574887 100644 --- a/be/src/exec/es/es_scan_reader.cpp +++ b/be/src/exec/es/es_scan_reader.cpp @@ -61,7 +61,7 @@ ESScanReader::ESScanReader(const std::string& target, _query = props.at(KEY_QUERY); } if (props.find(KEY_USE_SSL_CLIENT) != props.end()) { - _use_ssl_client = props.at(KEY_USE_SSL_CLIENT); + std::istringstream(props.at(KEY_USE_SSL_CLIENT)) >> std::boolalpha >> _use_ssl_client; } std::string batch_size_str = props.at(KEY_BATCH_SIZE); @@ -106,8 +106,8 @@ Status ESScanReader::open() { } _network_client.set_basic_auth(_user_name, _passwd); _network_client.set_content_type("application/json"); - if (_use_ssl_client == "true") { - _network_client.set_ssl(); + if (_use_ssl_client) { + _network_client.use_ssl(); } // phase open, we cached the first response for `get_next` phase Status status = _network_client.execute_post_request(_query, &_cached_response); @@ -140,8 +140,8 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr& scr _network_client.set_basic_auth(_user_name, _passwd); _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(_http_timeout_ms); - if (_use_ssl_client == "true") { - _network_client.set_ssl(); + if (_use_ssl_client) { + _network_client.use_ssl(); } RETURN_IF_ERROR(_network_client.execute_post_request( ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive), @@ -197,8 +197,8 @@ Status ESScanReader::close() { _network_client.set_method(DELETE); _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(5 * 1000); - if (_use_ssl_client == "true") { - _network_client.set_ssl(); + if (_use_ssl_client) { + _network_client.use_ssl(); } std::string response; RETURN_IF_ERROR(_network_client.execute_delete_request( diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h index f12d9ed2f8026c..2203622b9caed6 100644 --- a/be/src/exec/es/es_scan_reader.h +++ b/be/src/exec/es/es_scan_reader.h @@ -65,7 +65,7 @@ class ESScanReader { // Elasticsearch shards to fetch document std::string _shards; // whether use ssl client - std::string _use_ssl_client; + bool _use_ssl_client; // distinguish the first scroll phase and the following scroll bool _is_first; diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index b6ebab198c7b56..ad345bcb15fd30 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -66,7 +66,7 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str()); } - void set_ssl() { + void use_ssl() { curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYPEER, false); curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index bf679c19477614..f610b776bc9073 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -212,6 +212,15 @@ private void validate(Map properties) throws DdlException { if (properties.containsKey(USE_SSL_CLIENT)) { useSslClient = EsUtil.getBooleanProperty(properties, USE_SSL_CLIENT); + // check protocol + for (String seed : seeds) { + if (useSslClient && seed.startsWith("http://")) { + throw new DdlException("if use_ssl_client is true, the https protocol must be used"); + } + if (!useSslClient && seed.startsWith("https://")) { + throw new DdlException("if use_ssl_client is false, the http protocol must be used"); + } + } } if (!Strings.isNullOrEmpty(properties.get(TYPE)) @@ -256,7 +265,6 @@ private void validate(Map properties) throws DdlException { public TTableDescriptor toThrift() { TEsTable tEsTable = new TEsTable(); - tEsTable.setUseSslClient(useSslClient); TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ES_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setEsTable(tEsTable); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java index 918e240c93e8f5..c41f8e06130f9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java @@ -43,7 +43,7 @@ public class EsNodeInfo { private static final Logger LOG = LogManager.getLogger(EsNodeInfo.class); - public EsNodeInfo(String id, Map map) throws DorisEsException { + public EsNodeInfo(String id, Map map) { this.id = id; EsMajorVersion version = EsMajorVersion.parse((String) map.get("version")); this.name = (String) map.get("name"); @@ -101,21 +101,17 @@ public EsNodeInfo(String id, Map map) throws DorisEsException { } } - public EsNodeInfo(String id, String seed, boolean useSslClient) throws DorisEsException { + public EsNodeInfo(String id, String seed, boolean useSslClient) { this.id = id; - if (seed.startsWith("http://")) { - seed = seed.substring(7); - } - if (seed.startsWith("https://")) { - seed = seed.substring(8); + if (!seed.startsWith("http")) { + seed = useSslClient ? "https://" + seed : "http://" + seed; } String[] scratch = seed.split(":"); int port = 80; - String remoteHost = (useSslClient ? "https://" : "http://") + scratch[0]; - if (scratch.length == 2) { - port = Integer.parseInt(scratch[1]); + if (scratch.length == 3) { + port = Integer.parseInt(scratch[2]); } - LOG.info("--------------- remoteHost={}, port={} -------------", remoteHost, port); + String remoteHost = scratch[0] + ":" + scratch[1]; this.name = remoteHost; this.host = remoteHost; this.ip = remoteHost; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 9e36d7973340ad..5f12f50ce6e9a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -73,7 +73,6 @@ public EsRestClient(String[] nodes, String authUser, String authPassword, boolea } this.currentNode = nodes[currentNodeIndex]; if (useSslClient) { - LOG.info("use ssl client"); networkClient = new OkHttpClient.Builder() .readTimeout(10, TimeUnit.SECONDS) .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) @@ -245,16 +244,14 @@ public boolean verify(String hostname, SSLSession session) { } private static SSLSocketFactory createSSLSocketFactory() { - SSLSocketFactory ssfFactory = null; - + SSLSocketFactory ssfFactory; try { SSLContext sc = SSLContext.getInstance("TLS"); sc.init(null, new TrustManager[]{new TrustAllCerts()}, new SecureRandom()); - ssfFactory = sc.getSocketFactory(); } catch (Exception e) { + throw new DorisEsException("createSSLSocketFactory error"); } - return ssfFactory; } } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index c914dfa1843b0d..ba5de789d90b71 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -227,7 +227,6 @@ struct TOdbcTable { } struct TEsTable { - 1: required bool useSslClient } struct TSchemaTable { From b8fc83d7496ab332d89b15e5e3c9a302d131f850 Mon Sep 17 00:00:00 2001 From: stalary Date: Wed, 24 Feb 2021 16:47:13 +0800 Subject: [PATCH 04/13] MOD: modify some code --- be/src/exec/es/es_scan_reader.h | 2 +- be/src/http/http_client.h | 4 ++-- .../apache/doris/external/elasticsearch/PartitionPhase.java | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h index 2203622b9caed6..c4802690f2b49e 100644 --- a/be/src/exec/es/es_scan_reader.h +++ b/be/src/exec/es/es_scan_reader.h @@ -65,7 +65,7 @@ class ESScanReader { // Elasticsearch shards to fetch document std::string _shards; // whether use ssl client - bool _use_ssl_client; + bool _use_ssl_client = false; // distinguish the first scroll phase and the following scroll bool _is_first; diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index ad345bcb15fd30..1b9080db157ed2 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -67,8 +67,8 @@ class HttpClient { } void use_ssl() { - curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYPEER, false); - curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, false); + curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, 0L); } // TODO(zc): support set header diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java index 6e905b6d690f3f..97aeab04c044c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java @@ -43,8 +43,9 @@ public void execute(SearchContext context) throws DorisEsException { } else { nodesInfo = new HashMap<>(); String[] seeds = context.esTable().getSeeds(); + boolean useSSL = context.esTable().isUseSslClient(); for (int i = 0; i < seeds.length; i++) { - nodesInfo.put(String.valueOf(i), new EsNodeInfo(String.valueOf(i), seeds[i], context.esTable().isUseSslClient())); + nodesInfo.put(String.valueOf(i), new EsNodeInfo(String.valueOf(i), seeds[i], useSSL)); } } } From 671930362c7a77e34111c7e7c34880b34f88fcf9 Mon Sep 17 00:00:00 2001 From: stalary Date: Thu, 4 Mar 2021 00:10:14 +0800 Subject: [PATCH 05/13] MOD: modify code format, add comment --- be/src/exec/es/es_scan_reader.cpp | 6 +++--- be/src/http/http_client.h | 3 ++- .../doris/external/elasticsearch/EsUtil.java | 21 ++++++++++--------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp index bebb9c1a574887..1957609c24b449 100644 --- a/be/src/exec/es/es_scan_reader.cpp +++ b/be/src/exec/es/es_scan_reader.cpp @@ -107,7 +107,7 @@ Status ESScanReader::open() { _network_client.set_basic_auth(_user_name, _passwd); _network_client.set_content_type("application/json"); if (_use_ssl_client) { - _network_client.use_ssl(); + _network_client.use_untrusted_ssl(); } // phase open, we cached the first response for `get_next` phase Status status = _network_client.execute_post_request(_query, &_cached_response); @@ -141,7 +141,7 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr& scr _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(_http_timeout_ms); if (_use_ssl_client) { - _network_client.use_ssl(); + _network_client.use_untrusted_ssl(); } RETURN_IF_ERROR(_network_client.execute_post_request( ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive), @@ -198,7 +198,7 @@ Status ESScanReader::close() { _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(5 * 1000); if (_use_ssl_client) { - _network_client.use_ssl(); + _network_client.use_untrusted_ssl(); } std::string response; RETURN_IF_ERROR(_network_client.execute_delete_request( diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index 1b9080db157ed2..5fec678345383f 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -66,7 +66,8 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str()); } - void use_ssl() { + // Currently, only fake SSL configurations are supported + void use_untrusted_ssl() { curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYPEER, 0L); curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, 0L); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index c55661f2368d9d..09b4d7e3e5264b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -22,45 +22,46 @@ import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; + import org.json.JSONObject; import java.util.Map; public class EsUtil { - + public static void analyzePartitionAndDistributionDesc(PartitionDesc partitionDesc, - DistributionDesc distributionDesc) throws AnalysisException { + DistributionDesc distributionDesc) throws AnalysisException { if (partitionDesc == null && distributionDesc == null) { return; } - + if (partitionDesc != null) { if (!(partitionDesc instanceof RangePartitionDesc)) { throw new AnalysisException("Elasticsearch table only permit range partition"); } - + RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc; analyzePartitionDesc(rangePartitionDesc); } - + if (distributionDesc != null) { throw new AnalysisException("could not support distribution clause"); } } - + private static void analyzePartitionDesc(RangePartitionDesc partDesc) throws AnalysisException { if (partDesc.getPartitionColNames() == null || partDesc.getPartitionColNames().isEmpty()) { throw new AnalysisException("No partition columns."); } - + if (partDesc.getPartitionColNames().size() > 1) { throw new AnalysisException( "Elasticsearch table's partition column could only be a single column"); } } - - + + /** * get the json object from specified jsonObject * @@ -85,7 +86,7 @@ public static JSONObject getJsonObject(JSONObject jsonObject, String key, int fr return null; } } - + public static boolean getBooleanProperty(Map properties, String name) throws DdlException { String property = properties.get(name).trim(); try { From 6eda420d00b363ac49286b0bc86062f3e27f8f4d Mon Sep 17 00:00:00 2001 From: stalary Date: Sun, 7 Mar 2021 11:18:10 +0800 Subject: [PATCH 06/13] MOD: modify propName from use_ssl_client to es_net_ssl,remove manual add protocol prefix code, remove networkClient static --- be/src/exec/es/es_scan_reader.cpp | 4 +-- be/src/exec/es/es_scan_reader.h | 2 +- .../org/apache/doris/catalog/Catalog.java | 2 +- .../org/apache/doris/catalog/EsTable.java | 36 +++++++++---------- .../external/elasticsearch/EsNodeInfo.java | 5 +-- .../external/elasticsearch/EsRepository.java | 2 +- .../external/elasticsearch/EsRestClient.java | 2 +- .../doris/external/elasticsearch/EsUtil.java | 2 +- .../elasticsearch/PartitionPhase.java | 3 +- .../org/apache/doris/planner/EsScanNode.java | 2 +- 10 files changed, 28 insertions(+), 32 deletions(-) diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp index 1957609c24b449..a0a139d10ecae2 100644 --- a/be/src/exec/es/es_scan_reader.cpp +++ b/be/src/exec/es/es_scan_reader.cpp @@ -60,8 +60,8 @@ ESScanReader::ESScanReader(const std::string& target, if (props.find(KEY_QUERY) != props.end()) { _query = props.at(KEY_QUERY); } - if (props.find(KEY_USE_SSL_CLIENT) != props.end()) { - std::istringstream(props.at(KEY_USE_SSL_CLIENT)) >> std::boolalpha >> _use_ssl_client; + if (props.find(KEY_ES_NET_SSL) != props.end()) { + std::istringstream(props.at(KEY_ES_NET_SSL)) >> std::boolalpha >> _use_ssl_client; } std::string batch_size_str = props.at(KEY_BATCH_SIZE); diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h index c4802690f2b49e..400de4dd130c53 100644 --- a/be/src/exec/es/es_scan_reader.h +++ b/be/src/exec/es/es_scan_reader.h @@ -40,7 +40,7 @@ class ESScanReader { static constexpr const char* KEY_BATCH_SIZE = "batch_size"; static constexpr const char* KEY_TERMINATE_AFTER = "limit"; static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode"; - static constexpr const char* KEY_USE_SSL_CLIENT = "use_ssl_client"; + static constexpr const char* KEY_ES_NET_SSL = "es_net_ssl"; ESScanReader(const std::string& target, const std::map& props, bool doc_value_mode); ~ESScanReader(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 742e4cc392d563..423ef93bd1982d 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4151,7 +4151,7 @@ public static void getDdlStmt(String dbName, Table table, List createTab sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n"); sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n"); sb.append("\"es_nodes_discovery\" = \"").append(esTable.isEsNodesDiscovery()).append("\"\n"); - sb.append("\"use_ssl_client\" = \"").append(esTable.isUseSslClient()).append("\"\n"); + sb.append("\"es_net_ssl\" = \"").append(esTable.isEsNetSsl()).append("\"\n"); sb.append(")"); } else if (table.getType() == TableType.HIVE) { HiveTable hiveTable = (HiveTable) table; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index f610b776bc9073..cb8bd72b6bb77a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -65,7 +65,7 @@ public class EsTable extends Table { public static final String KEYWORD_SNIFF = "enable_keyword_sniff"; public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields"; public static final String ES_NODES_DISCOVERY = "es_nodes_discovery"; - public static final String USE_SSL_CLIENT = "use_ssl_client"; + public static final String ES_NET_SSL = "es_net_ssl"; private String hosts; private String[] seeds; @@ -92,7 +92,7 @@ public class EsTable extends Table { private boolean esNodesDiscovery = true; - private boolean useSslClient = false; + private boolean esNetSsl = false; // Solr doc_values vs stored_fields performance-smackdown indicate: // It is possible to notice that retrieving an high number of fields leads @@ -149,8 +149,8 @@ public boolean isEsNodesDiscovery() { return esNodesDiscovery; } - public boolean isUseSslClient() { - return useSslClient; + public boolean isEsNetSsl() { + return esNetSsl; } private void validate(Map properties) throws DdlException { @@ -199,26 +199,26 @@ private void validate(Map properties) throws DdlException { // enable doc value scan for Elasticsearch if (properties.containsKey(DOC_VALUE_SCAN)) { - enableDocValueScan = EsUtil.getBooleanProperty(properties, DOC_VALUE_SCAN); + enableDocValueScan = EsUtil.getBoolean(properties, DOC_VALUE_SCAN); } if (properties.containsKey(KEYWORD_SNIFF)) { - enableKeywordSniff = EsUtil.getBooleanProperty(properties, KEYWORD_SNIFF); + enableKeywordSniff = EsUtil.getBoolean(properties, KEYWORD_SNIFF); } if (properties.containsKey(ES_NODES_DISCOVERY)) { - esNodesDiscovery = EsUtil.getBooleanProperty(properties, ES_NODES_DISCOVERY); + esNodesDiscovery = EsUtil.getBoolean(properties, ES_NODES_DISCOVERY); } - if (properties.containsKey(USE_SSL_CLIENT)) { - useSslClient = EsUtil.getBooleanProperty(properties, USE_SSL_CLIENT); + if (properties.containsKey(ES_NET_SSL)) { + esNetSsl = EsUtil.getBoolean(properties, ES_NET_SSL); // check protocol for (String seed : seeds) { - if (useSslClient && seed.startsWith("http://")) { - throw new DdlException("if use_ssl_client is true, the https protocol must be used"); + if (esNetSsl && seed.startsWith("http://")) { + throw new DdlException("if es_net_ssl is true, the https protocol must be used"); } - if (!useSslClient && seed.startsWith("https://")) { - throw new DdlException("if use_ssl_client is false, the http protocol must be used"); + if (!esNetSsl && seed.startsWith("https://")) { + throw new DdlException("if es_net_ssl is false, the http protocol must be used"); } } } @@ -260,7 +260,7 @@ private void validate(Map properties) throws DdlException { tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff)); tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields)); tableContext.put(ES_NODES_DISCOVERY, String.valueOf(esNodesDiscovery)); - tableContext.put(USE_SSL_CLIENT, String.valueOf(useSslClient)); + tableContext.put(ES_NET_SSL, String.valueOf(esNetSsl)); } public TTableDescriptor toThrift() { @@ -348,10 +348,10 @@ public void readFields(DataInput in) throws IOException { } else { esNodesDiscovery = true; } - if (tableContext.containsKey(USE_SSL_CLIENT)) { - useSslClient = Boolean.parseBoolean(tableContext.get(USE_SSL_CLIENT)); + if (tableContext.containsKey(ES_NET_SSL)) { + esNetSsl = Boolean.parseBoolean(tableContext.get(ES_NET_SSL)); } else { - useSslClient = false; + esNetSsl = false; } PartitionType partType = PartitionType.valueOf(Text.readString(in)); if (partType == PartitionType.UNPARTITIONED) { @@ -387,7 +387,7 @@ public void readFields(DataInput in) throws IOException { tableContext.put("enableDocValueScan", "false"); tableContext.put(KEYWORD_SNIFF, "true"); tableContext.put(ES_NODES_DISCOVERY, "true"); - tableContext.put(USE_SSL_CLIENT, "false"); + tableContext.put(ES_NET_SSL, "false"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java index c41f8e06130f9a..3c98a12c168dd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java @@ -101,11 +101,8 @@ public EsNodeInfo(String id, Map map) { } } - public EsNodeInfo(String id, String seed, boolean useSslClient) { + public EsNodeInfo(String id, String seed) { this.id = id; - if (!seed.startsWith("http")) { - seed = useSslClient ? "https://" + seed : "http://" + seed; - } String[] scratch = seed.split(":"); int port = 80; if (scratch.length == 3) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java index 16d2c3d78ec43e..d6b7e48e5e3c0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java @@ -59,7 +59,7 @@ public void registerTable(EsTable esTable) { } esTables.put(esTable.getId(), esTable); esClients.put(esTable.getId(), - new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd(), esTable.isUseSslClient())); + new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd(), esTable.isEsNetSsl())); LOG.info("register a new table [{}] to sync list", esTable); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 5f12f50ce6e9a1..7924fc0420a852 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -57,7 +57,7 @@ public class EsRestClient { mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false); } - private static OkHttpClient networkClient; + private OkHttpClient networkClient; private Request.Builder builder; private String[] nodes; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index 09b4d7e3e5264b..454fb5fbc9706b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -87,7 +87,7 @@ public static JSONObject getJsonObject(JSONObject jsonObject, String key, int fr } } - public static boolean getBooleanProperty(Map properties, String name) throws DdlException { + public static boolean getBoolean(Map properties, String name) throws DdlException { String property = properties.get(name).trim(); try { return Boolean.parseBoolean(property); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java index 97aeab04c044c8..ebb1d34b00834f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java @@ -43,9 +43,8 @@ public void execute(SearchContext context) throws DorisEsException { } else { nodesInfo = new HashMap<>(); String[] seeds = context.esTable().getSeeds(); - boolean useSSL = context.esTable().isUseSslClient(); for (int i = 0; i < seeds.length; i++) { - nodesInfo.put(String.valueOf(i), new EsNodeInfo(String.valueOf(i), seeds[i], useSSL)); + nodesInfo.put(String.valueOf(i), new EsNodeInfo(String.valueOf(i), seeds[i])); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java index 4662a887b0c5d2..6e3653d363c8e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -148,7 +148,7 @@ protected void toThrift(TPlanNode msg) { Map properties = Maps.newHashMap(); properties.put(EsTable.USER, table.getUserName()); properties.put(EsTable.PASSWORD, table.getPasswd()); - properties.put(EsTable.USE_SSL_CLIENT, String.valueOf(table.isUseSslClient())); + properties.put(EsTable.ES_NET_SSL, String.valueOf(table.isEsNetSsl())); TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt()); esScanNode.setProperties(properties); if (table.isDocValueScanEnable()) { From 465e0c0571dd52fe4b393db5bf7b3e0d66264346 Mon Sep 17 00:00:00 2001 From: stalary Date: Sun, 7 Mar 2021 11:40:46 +0800 Subject: [PATCH 07/13] ADD: add doc --- docs/en/extending-doris/doris-on-es.md | 57 ++++++++++++++++++++++ docs/zh-CN/extending-doris/doris-on-es.md | 58 +++++++++++++++++++++++ 2 files changed, 115 insertions(+) diff --git a/docs/en/extending-doris/doris-on-es.md b/docs/en/extending-doris/doris-on-es.md index 3a7209a16745a1..ea7bd82b2f21ee 100644 --- a/docs/en/extending-doris/doris-on-es.md +++ b/docs/en/extending-doris/doris-on-es.md @@ -328,6 +328,63 @@ This term does not match any term in the dictionary, and will not return any res The type of `k4.keyword` is `keyword`, and writing data into ES is a complete term, so it can be matched +### Enable ES node discovery(es\_nodes\_discovery=true) + +``` +CREATE EXTERNAL TABLE `test` ( + `k1` bigint(20) COMMENT "", + `k2` datetime COMMENT "", + `k3` varchar(20) COMMENT "", + `k4` varchar(100) COMMENT "", + `k5` float COMMENT "" +) ENGINE=ELASTICSEARCH +PROPERTIES ( +"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", +"index" = "test”, +"type" = "doc", +"user" = "root", +"password" = "root", + +"es_nodes_discovery" = "true" +); +``` + +Parameter Description: + +Parameter | Description +---|--- +**es\_nodes\_discovery** | Whether or not to enable ES node discovery. the default is true + +When enabled, Doris will find all available nodes from ES. If you only want Doris to access some nodes, you can turn this configuration off + +### Use SSL authentication(es\_net\_ssl=true) + +``` +CREATE EXTERNAL TABLE `test` ( + `k1` bigint(20) COMMENT "", + `k2` datetime COMMENT "", + `k3` varchar(20) COMMENT "", + `k4` varchar(100) COMMENT "", + `k5` float COMMENT "" +) ENGINE=ELASTICSEARCH +PROPERTIES ( +"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", +"index" = "test”, +"type" = "doc", +"user" = "root", +"password" = "root", + +"es_net_ssl" = "true" +); +``` + +Parameter Description: + +Parameter | Description +---|--- +**es\_net\_ssl** | SSL authentication is enabled when supporting HTTPS, the default is false + +The current FE/BE implementation is to trust all, this is a temporary solution, and the real user configuration certificate will be used later ### Query usage diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md index a1c314829e5ce0..8a791ba2c9160f 100644 --- a/docs/zh-CN/extending-doris/doris-on-es.md +++ b/docs/zh-CN/extending-doris/doris-on-es.md @@ -325,6 +325,64 @@ POST /_analyze `k4.keyword` 的类型是`keyword`,数据写入ES中是一个完整的term,所以可以匹配 +### 开启es节点发现(es\_nodes\_discovery=true) + +``` +CREATE EXTERNAL TABLE `test` ( + `k1` bigint(20) COMMENT "", + `k2` datetime COMMENT "", + `k3` varchar(20) COMMENT "", + `k4` varchar(100) COMMENT "", + `k5` float COMMENT "" +) ENGINE=ELASTICSEARCH +PROPERTIES ( +"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", +"index" = "test”, +"type" = "doc", +"user" = "root", +"password" = "root", + +"es_nodes_discovery" = "true" +); +``` + +参数说明: + +参数 | 说明 +---|--- +**es\_nodes\_discovery** | 是否开启es节点发现,默认为true + +开启后Doris会从ES发现所有可用节点,如果只希望Doris访问部分节点,可以关闭此配置 + +### 使用ssl方式认证(es\_net\_ssl=true) + +``` +CREATE EXTERNAL TABLE `test` ( + `k1` bigint(20) COMMENT "", + `k2` datetime COMMENT "", + `k3` varchar(20) COMMENT "", + `k4` varchar(100) COMMENT "", + `k5` float COMMENT "" +) ENGINE=ELASTICSEARCH +PROPERTIES ( +"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", +"index" = "test”, +"type" = "doc", +"user" = "root", +"password" = "root", + +"es_net_ssl" = "true" +); +``` + +参数说明: + +参数 | 说明 +---|--- +**es\_net\_ssl** | 是否使用ssl,默认为false,当需要支持https时,需要开启此配置 + +目前会fe/be实现方式为信任所有,这是临时解决方案,后续会使用真实的用户配置证书 + ### 查询用法 完成在Doris中建立ES外表后,除了无法使用Doris中的数据模型(rollup、预聚合、物化视图等)外并无区别 From 0bbf7e141331be88e906d50cd6ace24f02834c95 Mon Sep 17 00:00:00 2001 From: stalary Date: Mon, 8 Mar 2021 13:22:13 +0800 Subject: [PATCH 08/13] MOD: modify networkClient use way, EsNodeInfo add useSslClient field --- .../external/elasticsearch/EsNodeInfo.java | 4 +- .../external/elasticsearch/EsRestClient.java | 37 +++++++++++-------- .../elasticsearch/PartitionPhase.java | 2 +- .../elasticsearch/PartitionPhaseTest.java | 4 +- 4 files changed, 27 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java index 3c98a12c168dd0..e357ee56e6a6eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java @@ -43,7 +43,7 @@ public class EsNodeInfo { private static final Logger LOG = LogManager.getLogger(EsNodeInfo.class); - public EsNodeInfo(String id, Map map) { + public EsNodeInfo(String id, Map map, boolean useSslClient) { this.id = id; EsMajorVersion version = EsMajorVersion.parse((String) map.get("version")); this.name = (String) map.get("name"); @@ -71,7 +71,7 @@ public EsNodeInfo(String id, Map map) { String address = (String) httpMap.get("publish_address"); if (address != null) { String[] scratch = address.split(":"); - this.publishAddress = new TNetworkAddress(scratch[0], Integer.valueOf(scratch[1])); + this.publishAddress = new TNetworkAddress((useSslClient ? "https://" : "") + scratch[0], Integer.parseInt(scratch[1])); this.hasHttp = true; } else { this.publishAddress = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 7924fc0420a852..1fe4122f009012 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -57,12 +57,15 @@ public class EsRestClient { mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false); } - private OkHttpClient networkClient; + private static OkHttpClient networkClient; + + private static OkHttpClient sslNetworkClient; private Request.Builder builder; private String[] nodes; private String currentNode; private int currentNodeIndex = 0; + private boolean useSslClient; public EsRestClient(String[] nodes, String authUser, String authPassword, boolean useSslClient) { this.nodes = nodes; @@ -72,17 +75,15 @@ public EsRestClient(String[] nodes, String authUser, String authPassword, boolea Credentials.basic(authUser, authPassword)); } this.currentNode = nodes[currentNodeIndex]; - if (useSslClient) { - networkClient = new OkHttpClient.Builder() - .readTimeout(10, TimeUnit.SECONDS) - .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) - .hostnameVerifier(new TrustAllHostnameVerifier()) - .build(); - } else { - networkClient = new OkHttpClient.Builder() - .readTimeout(10, TimeUnit.SECONDS) - .build(); - } + this.useSslClient = useSslClient; + sslNetworkClient = new OkHttpClient.Builder() + .readTimeout(10, TimeUnit.SECONDS) + .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) + .hostnameVerifier(new TrustAllHostnameVerifier()) + .build(); + networkClient = new OkHttpClient.Builder() + .readTimeout(10, TimeUnit.SECONDS) + .build(); } private void selectNextNode() { @@ -94,14 +95,14 @@ private void selectNextNode() { currentNode = nodes[currentNodeIndex]; } - public Map getHttpNodes() throws DorisEsException { + public Map getHttpNodes(boolean useSslClient) throws DorisEsException { Map> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); } Map nodesMap = new HashMap<>(); for (Map.Entry> entry : nodesData.entrySet()) { - EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); + EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), useSslClient); if (node.hasHttp()) { nodesMap.put(node.getId(), node); } @@ -169,6 +170,12 @@ public EsShardPartitions searchShards(String indexName) throws DorisEsException private String execute(String path) throws DorisEsException { int retrySize = nodes.length; DorisEsException scratchExceptionForThrow = null; + OkHttpClient httpClient; + if (useSslClient) { + httpClient = sslNetworkClient; + } else { + httpClient = networkClient; + } for (int i = 0; i < retrySize; i++) { // maybe should add HTTP schema to the address // actually, at this time we can only process http protocol @@ -188,7 +195,7 @@ private String execute(String path) throws DorisEsException { LOG.trace("es rest client request URL: {}", currentNode + "/" + path); } try { - response = networkClient.newCall(request).execute(); + response = httpClient.newCall(request).execute(); if (response.isSuccessful()) { return response.body().string(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java index ebb1d34b00834f..66173a01ca910c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java @@ -39,7 +39,7 @@ public PartitionPhase(EsRestClient client) { public void execute(SearchContext context) throws DorisEsException { shardPartitions = client.searchShards(context.sourceIndex()); if (context.esNodesDiscovery()) { - nodesInfo = client.getHttpNodes(); + nodesInfo = client.getHttpNodes(context.esTable().isEsNetSsl()); } else { nodesInfo = new HashMap<>(); String[] seeds = context.esTable().getSeeds(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java index aa82dff94e34e6..27a26d155677e9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java @@ -50,7 +50,7 @@ public void testWorkFlow(@Injectable EsRestClient client) throws Exception { Map> nodesData = (Map>) mapper.readValue(jsonParser, Map.class).get("nodes"); Map nodesMap = new HashMap<>(); for (Map.Entry> entry : nodesData.entrySet()) { - EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); + EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), false); if (node.hasHttp()) { nodesMap.put(node.getId(), node); } @@ -58,7 +58,7 @@ public void testWorkFlow(@Injectable EsRestClient client) throws Exception { new Expectations(client) { { - client.getHttpNodes(); + client.getHttpNodes(false); minTimes = 0; result = nodesMap; From f0925b40df77b1eabb5bee0669da9ebedd399c23 Mon Sep 17 00:00:00 2001 From: stalary Date: Sun, 21 Mar 2021 23:17:00 +0800 Subject: [PATCH 09/13] MOD: modify name --- be/src/exec/es/es_scan_reader.cpp | 4 +- be/src/exec/es/es_scan_reader.h | 2 +- docs/en/extending-doris/doris-on-es.md | 4 +- docs/zh-CN/extending-doris/doris-on-es.md | 4 +- .../org/apache/doris/catalog/Catalog.java | 4 +- .../org/apache/doris/catalog/EsTable.java | 52 +++++++++---------- .../external/elasticsearch/EsNodeInfo.java | 4 +- .../external/elasticsearch/EsRepository.java | 2 +- .../external/elasticsearch/EsRestClient.java | 23 ++++---- .../elasticsearch/EsShardPartitions.java | 12 +++-- .../elasticsearch/EsTablePartitions.java | 18 ------- .../elasticsearch/PartitionPhase.java | 6 +-- .../external/elasticsearch/SearchContext.java | 8 +-- .../org/apache/doris/planner/EsScanNode.java | 2 +- .../elasticsearch/PartitionPhaseTest.java | 4 +- 15 files changed, 69 insertions(+), 80 deletions(-) diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp index a0a139d10ecae2..87131a76476b4b 100644 --- a/be/src/exec/es/es_scan_reader.cpp +++ b/be/src/exec/es/es_scan_reader.cpp @@ -60,8 +60,8 @@ ESScanReader::ESScanReader(const std::string& target, if (props.find(KEY_QUERY) != props.end()) { _query = props.at(KEY_QUERY); } - if (props.find(KEY_ES_NET_SSL) != props.end()) { - std::istringstream(props.at(KEY_ES_NET_SSL)) >> std::boolalpha >> _use_ssl_client; + if (props.find(KEY_HTTP_SSL_ENABLED) != props.end()) { + std::istringstream(props.at(KEY_HTTP_SSL_ENABLED)) >> std::boolalpha >> _use_ssl_client; } std::string batch_size_str = props.at(KEY_BATCH_SIZE); diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h index 400de4dd130c53..f162dc3bca36be 100644 --- a/be/src/exec/es/es_scan_reader.h +++ b/be/src/exec/es/es_scan_reader.h @@ -40,7 +40,7 @@ class ESScanReader { static constexpr const char* KEY_BATCH_SIZE = "batch_size"; static constexpr const char* KEY_TERMINATE_AFTER = "limit"; static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode"; - static constexpr const char* KEY_ES_NET_SSL = "es_net_ssl"; + static constexpr const char* KEY_HTTP_SSL_ENABLED = "http_ssl_enabled"; ESScanReader(const std::string& target, const std::map& props, bool doc_value_mode); ~ESScanReader(); diff --git a/docs/en/extending-doris/doris-on-es.md b/docs/en/extending-doris/doris-on-es.md index ea7bd82b2f21ee..6582d808156123 100644 --- a/docs/en/extending-doris/doris-on-es.md +++ b/docs/en/extending-doris/doris-on-es.md @@ -345,7 +345,7 @@ PROPERTIES ( "user" = "root", "password" = "root", -"es_nodes_discovery" = "true" +"nodes_discovery" = "true" ); ``` @@ -374,7 +374,7 @@ PROPERTIES ( "user" = "root", "password" = "root", -"es_net_ssl" = "true" +"http_ssl_enabled" = "true" ); ``` diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md index 8a791ba2c9160f..45da62ac8c3f5c 100644 --- a/docs/zh-CN/extending-doris/doris-on-es.md +++ b/docs/zh-CN/extending-doris/doris-on-es.md @@ -342,7 +342,7 @@ PROPERTIES ( "user" = "root", "password" = "root", -"es_nodes_discovery" = "true" +"nodes_discovery" = "true" ); ``` @@ -371,7 +371,7 @@ PROPERTIES ( "user" = "root", "password" = "root", -"es_net_ssl" = "true" +"http_ssl_enabled" = "true" ); ``` diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 423ef93bd1982d..a4b5b626bdf81c 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4150,8 +4150,8 @@ public static void getDdlStmt(String dbName, Table table, List createTab sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n"); sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n"); sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n"); - sb.append("\"es_nodes_discovery\" = \"").append(esTable.isEsNodesDiscovery()).append("\"\n"); - sb.append("\"es_net_ssl\" = \"").append(esTable.isEsNetSsl()).append("\"\n"); + sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\"\n"); + sb.append("\"http_ssl_enabled\" = \"").append(esTable.isHttpSslEnabled()).append("\"\n"); sb.append(")"); } else if (table.getType() == TableType.HIVE) { HiveTable hiveTable = (HiveTable) table; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index cb8bd72b6bb77a..cbd2b7786d988c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -64,8 +64,8 @@ public class EsTable extends Table { public static final String DOC_VALUE_SCAN = "enable_docvalue_scan"; public static final String KEYWORD_SNIFF = "enable_keyword_sniff"; public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields"; - public static final String ES_NODES_DISCOVERY = "es_nodes_discovery"; - public static final String ES_NET_SSL = "es_net_ssl"; + public static final String NODES_DISCOVERY = "nodes_discovery"; + public static final String HTTP_SSL_ENABLED = "http_ssl_enabled"; private String hosts; private String[] seeds; @@ -90,9 +90,9 @@ public class EsTable extends Table { // would downgrade to extract value from `stored_fields` private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; - private boolean esNodesDiscovery = true; + private boolean nodesDiscovery = true; - private boolean esNetSsl = false; + private boolean httpSslEnabled = false; // Solr doc_values vs stored_fields performance-smackdown indicate: // It is possible to notice that retrieving an high number of fields leads @@ -145,12 +145,12 @@ public boolean isKeywordSniffEnable() { return enableKeywordSniff; } - public boolean isEsNodesDiscovery() { - return esNodesDiscovery; + public boolean isNodesDiscovery() { + return nodesDiscovery; } - public boolean isEsNetSsl() { - return esNetSsl; + public boolean isHttpSslEnabled() { + return httpSslEnabled; } private void validate(Map properties) throws DdlException { @@ -206,19 +206,19 @@ private void validate(Map properties) throws DdlException { enableKeywordSniff = EsUtil.getBoolean(properties, KEYWORD_SNIFF); } - if (properties.containsKey(ES_NODES_DISCOVERY)) { - esNodesDiscovery = EsUtil.getBoolean(properties, ES_NODES_DISCOVERY); + if (properties.containsKey(NODES_DISCOVERY)) { + nodesDiscovery = EsUtil.getBoolean(properties, NODES_DISCOVERY); } - if (properties.containsKey(ES_NET_SSL)) { - esNetSsl = EsUtil.getBoolean(properties, ES_NET_SSL); + if (properties.containsKey(HTTP_SSL_ENABLED)) { + httpSslEnabled = EsUtil.getBoolean(properties, HTTP_SSL_ENABLED); // check protocol for (String seed : seeds) { - if (esNetSsl && seed.startsWith("http://")) { - throw new DdlException("if es_net_ssl is true, the https protocol must be used"); + if (httpSslEnabled && seed.startsWith("http://")) { + throw new DdlException("if http_ssl_enabled is true, the https protocol must be used"); } - if (!esNetSsl && seed.startsWith("https://")) { - throw new DdlException("if es_net_ssl is false, the http protocol must be used"); + if (!httpSslEnabled && seed.startsWith("https://")) { + throw new DdlException("if http_ssl_enabled is false, the http protocol must be used"); } } } @@ -259,8 +259,8 @@ private void validate(Map properties) throws DdlException { tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan)); tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff)); tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields)); - tableContext.put(ES_NODES_DISCOVERY, String.valueOf(esNodesDiscovery)); - tableContext.put(ES_NET_SSL, String.valueOf(esNetSsl)); + tableContext.put(NODES_DISCOVERY, String.valueOf(nodesDiscovery)); + tableContext.put(HTTP_SSL_ENABLED, String.valueOf(httpSslEnabled)); } public TTableDescriptor toThrift() { @@ -343,15 +343,15 @@ public void readFields(DataInput in) throws IOException { maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; } } - if (tableContext.containsKey(ES_NODES_DISCOVERY)) { - esNodesDiscovery = Boolean.parseBoolean(tableContext.get(ES_NODES_DISCOVERY)); + if (tableContext.containsKey(NODES_DISCOVERY)) { + nodesDiscovery = Boolean.parseBoolean(tableContext.get(NODES_DISCOVERY)); } else { - esNodesDiscovery = true; + nodesDiscovery = true; } - if (tableContext.containsKey(ES_NET_SSL)) { - esNetSsl = Boolean.parseBoolean(tableContext.get(ES_NET_SSL)); + if (tableContext.containsKey(HTTP_SSL_ENABLED)) { + httpSslEnabled = Boolean.parseBoolean(tableContext.get(HTTP_SSL_ENABLED)); } else { - esNetSsl = false; + httpSslEnabled = false; } PartitionType partType = PartitionType.valueOf(Text.readString(in)); if (partType == PartitionType.UNPARTITIONED) { @@ -386,8 +386,8 @@ public void readFields(DataInput in) throws IOException { tableContext.put("transport", transport); tableContext.put("enableDocValueScan", "false"); tableContext.put(KEYWORD_SNIFF, "true"); - tableContext.put(ES_NODES_DISCOVERY, "true"); - tableContext.put(ES_NET_SSL, "false"); + tableContext.put(NODES_DISCOVERY, "true"); + tableContext.put(HTTP_SSL_ENABLED, "false"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java index e357ee56e6a6eb..a1e62d589775d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java @@ -43,7 +43,7 @@ public class EsNodeInfo { private static final Logger LOG = LogManager.getLogger(EsNodeInfo.class); - public EsNodeInfo(String id, Map map, boolean useSslClient) { + public EsNodeInfo(String id, Map map) { this.id = id; EsMajorVersion version = EsMajorVersion.parse((String) map.get("version")); this.name = (String) map.get("name"); @@ -71,7 +71,7 @@ public EsNodeInfo(String id, Map map, boolean useSslClient) { String address = (String) httpMap.get("publish_address"); if (address != null) { String[] scratch = address.split(":"); - this.publishAddress = new TNetworkAddress((useSslClient ? "https://" : "") + scratch[0], Integer.parseInt(scratch[1])); + this.publishAddress = new TNetworkAddress(scratch[0], Integer.parseInt(scratch[1])); this.hasHttp = true; } else { this.publishAddress = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java index d6b7e48e5e3c0c..2d94ee8df10551 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java @@ -59,7 +59,7 @@ public void registerTable(EsTable esTable) { } esTables.put(esTable.getId(), esTable); esClients.put(esTable.getId(), - new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd(), esTable.isEsNetSsl())); + new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd(), esTable.isHttpSslEnabled())); LOG.info("register a new table [{}] to sync list", esTable); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 1fe4122f009012..8615719eeda1c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -57,7 +57,9 @@ public class EsRestClient { mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false); } - private static OkHttpClient networkClient; + private static OkHttpClient networkClient = new OkHttpClient.Builder() + .readTimeout(10, TimeUnit.SECONDS) + .build(); private static OkHttpClient sslNetworkClient; @@ -76,14 +78,6 @@ public EsRestClient(String[] nodes, String authUser, String authPassword, boolea } this.currentNode = nodes[currentNodeIndex]; this.useSslClient = useSslClient; - sslNetworkClient = new OkHttpClient.Builder() - .readTimeout(10, TimeUnit.SECONDS) - .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) - .hostnameVerifier(new TrustAllHostnameVerifier()) - .build(); - networkClient = new OkHttpClient.Builder() - .readTimeout(10, TimeUnit.SECONDS) - .build(); } private void selectNextNode() { @@ -95,14 +89,14 @@ private void selectNextNode() { currentNode = nodes[currentNodeIndex]; } - public Map getHttpNodes(boolean useSslClient) throws DorisEsException { + public Map getHttpNodes() throws DorisEsException { Map> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); } Map nodesMap = new HashMap<>(); for (Map.Entry> entry : nodesData.entrySet()) { - EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), useSslClient); + EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); if (node.hasHttp()) { nodesMap.put(node.getId(), node); } @@ -172,6 +166,13 @@ private String execute(String path) throws DorisEsException { DorisEsException scratchExceptionForThrow = null; OkHttpClient httpClient; if (useSslClient) { + if (sslNetworkClient == null) { + sslNetworkClient = new OkHttpClient.Builder() + .readTimeout(10, TimeUnit.SECONDS) + .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) + .hostnameVerifier(new TrustAllHostnameVerifier()) + .build(); + } httpClient = sslNetworkClient; } else { httpClient = networkClient; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java index b81deade16cf05..061b9133586701 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java @@ -94,16 +94,22 @@ public static EsShardPartitions findShardPartitions(String indexName, String sea return partitions; } - public void addHttpAddress(Map nodesInfo) { + public void addHttpAddress(Map nodesInfo, boolean httpSslEnabled) { for (Map.Entry> entry : shardRoutings.entrySet()) { List shardRoutings = entry.getValue(); for (EsShardRouting shardRouting : shardRoutings) { String nodeId = shardRouting.getNodeId(); + TNetworkAddress httpAddress; if (nodesInfo.containsKey(nodeId)) { - shardRouting.setHttpAddress(nodesInfo.get(nodeId).getPublishAddress()); + httpAddress = nodesInfo.get(nodeId).getPublishAddress(); } else { - shardRouting.setHttpAddress(randomAddress(nodesInfo)); + httpAddress = randomAddress(nodesInfo); } + // If ssl is enabled, determine if the https protocol is required + if (httpSslEnabled && !httpAddress.getHostname().startsWith("http")) { + httpAddress.setHostname("https://" + httpAddress.getHostname()); + } + shardRouting.setHttpAddress(httpAddress); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java index 05c4fe08b55347..d6bc5e3d1ca527 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.SinglePartitionInfo; import org.apache.doris.common.DdlException; -import org.apache.doris.thrift.TNetworkAddress; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,7 +35,6 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Random; /** * save the dynamic info parsed from es cluster state such as shard routing, partition info @@ -109,22 +107,6 @@ public static EsTablePartitions fromShardPartitions(EsTable esTable, EsShardPart } return esTablePartitions; } - - public void addHttpAddress(Map nodesInfo) { - for (EsShardPartitions indexState : partitionedIndexStates.values()) { - indexState.addHttpAddress(nodesInfo); - } - for (EsShardPartitions indexState : unPartitionedIndexStates.values()) { - indexState.addHttpAddress(nodesInfo); - } - - } - - public TNetworkAddress randomAddress(Map nodesInfo) { - int seed = new Random().nextInt() % nodesInfo.size(); - EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray(); - return nodeInfos[seed].getPublishAddress(); - } public PartitionInfo getPartitionInfo() { return partitionInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java index 66173a01ca910c..8c9e1f98ca6028 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java @@ -38,8 +38,8 @@ public PartitionPhase(EsRestClient client) { @Override public void execute(SearchContext context) throws DorisEsException { shardPartitions = client.searchShards(context.sourceIndex()); - if (context.esNodesDiscovery()) { - nodesInfo = client.getHttpNodes(context.esTable().isEsNetSsl()); + if (context.nodesDiscovery()) { + nodesInfo = client.getHttpNodes(); } else { nodesInfo = new HashMap<>(); String[] seeds = context.esTable().getSeeds(); @@ -54,7 +54,7 @@ public void execute(SearchContext context) throws DorisEsException { public void postProcess(SearchContext context) throws DorisEsException { context.partitions(shardPartitions); if (EsTable.TRANSPORT_HTTP.equals(context.esTable().getTransport())) { - context.partitions().addHttpAddress(nodesInfo); + context.partitions().addHttpAddress(nodesInfo, context.esTable().isHttpSslEnabled()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java index f306415ec61b33..3e9e03dc9f2b50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java @@ -85,7 +85,7 @@ public class SearchContext { private EsMajorVersion version; // whether the nodes needs to be discovered - private boolean esNodesDiscovery; + private boolean nodesDiscovery; public SearchContext(EsTable table) { @@ -93,7 +93,7 @@ public SearchContext(EsTable table) { fullSchema = table.getFullSchema(); sourceIndex = table.getIndexName(); type = table.getMappingType(); - esNodesDiscovery = table.isEsNodesDiscovery(); + nodesDiscovery = table.isNodesDiscovery(); } @@ -147,7 +147,7 @@ public EsTablePartitions tablePartitions() throws Exception { return EsTablePartitions.fromShardPartitions(table, shardPartitions); } - public boolean esNodesDiscovery() { - return esNodesDiscovery; + public boolean nodesDiscovery() { + return nodesDiscovery; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java index 6e3653d363c8e8..e662de08931f1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -148,7 +148,7 @@ protected void toThrift(TPlanNode msg) { Map properties = Maps.newHashMap(); properties.put(EsTable.USER, table.getUserName()); properties.put(EsTable.PASSWORD, table.getPasswd()); - properties.put(EsTable.ES_NET_SSL, String.valueOf(table.isEsNetSsl())); + properties.put(EsTable.HTTP_SSL_ENABLED, String.valueOf(table.isHttpSslEnabled())); TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt()); esScanNode.setProperties(properties); if (table.isDocValueScanEnable()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java index 27a26d155677e9..aa82dff94e34e6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java @@ -50,7 +50,7 @@ public void testWorkFlow(@Injectable EsRestClient client) throws Exception { Map> nodesData = (Map>) mapper.readValue(jsonParser, Map.class).get("nodes"); Map nodesMap = new HashMap<>(); for (Map.Entry> entry : nodesData.entrySet()) { - EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), false); + EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); if (node.hasHttp()) { nodesMap.put(node.getId(), node); } @@ -58,7 +58,7 @@ public void testWorkFlow(@Injectable EsRestClient client) throws Exception { new Expectations(client) { { - client.getHttpNodes(false); + client.getHttpNodes(); minTimes = 0; result = nodesMap; From 76ac0150d3d37cca5b4f640c6c93c72f94ee68b1 Mon Sep 17 00:00:00 2001 From: stalary Date: Sat, 3 Apr 2021 18:18:30 +0800 Subject: [PATCH 10/13] MOD: modify some code --- docs/en/extending-doris/doris-on-es.md | 4 +-- docs/zh-CN/extending-doris/doris-on-es.md | 4 +-- .../external/elasticsearch/EsNodeInfo.java | 4 +-- .../external/elasticsearch/EsRestClient.java | 31 ++++++++++++------- .../elasticsearch/EsShardPartitions.java | 12 ++----- .../elasticsearch/PartitionPhase.java | 2 +- .../elasticsearch/PartitionPhaseTest.java | 2 +- 7 files changed, 30 insertions(+), 29 deletions(-) diff --git a/docs/en/extending-doris/doris-on-es.md b/docs/en/extending-doris/doris-on-es.md index 6582d808156123..87e9feb712a1b0 100644 --- a/docs/en/extending-doris/doris-on-es.md +++ b/docs/en/extending-doris/doris-on-es.md @@ -328,7 +328,7 @@ This term does not match any term in the dictionary, and will not return any res The type of `k4.keyword` is `keyword`, and writing data into ES is a complete term, so it can be matched -### Enable ES node discovery(es\_nodes\_discovery=true) +### Enable node discovery mechanism, default is true(es\_nodes\_discovery=true) ``` CREATE EXTERNAL TABLE `test` ( @@ -357,7 +357,7 @@ Parameter | Description When enabled, Doris will find all available nodes from ES. If you only want Doris to access some nodes, you can turn this configuration off -### Use SSL authentication(es\_net\_ssl=true) +### Enable SSL protocol when making an HTTP request, default is false(http\_ssl\_enable=true) ``` CREATE EXTERNAL TABLE `test` ( diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md index 45da62ac8c3f5c..f0c6f52a6367b0 100644 --- a/docs/zh-CN/extending-doris/doris-on-es.md +++ b/docs/zh-CN/extending-doris/doris-on-es.md @@ -325,7 +325,7 @@ POST /_analyze `k4.keyword` 的类型是`keyword`,数据写入ES中是一个完整的term,所以可以匹配 -### 开启es节点发现(es\_nodes\_discovery=true) +### 开启节点自动发现, 默认为true(es\_nodes\_discovery=true) ``` CREATE EXTERNAL TABLE `test` ( @@ -354,7 +354,7 @@ PROPERTIES ( 开启后Doris会从ES发现所有可用节点,如果只希望Doris访问部分节点,可以关闭此配置 -### 使用ssl方式认证(es\_net\_ssl=true) +### 在发起http请求时开启ssl协议, 默认为false(es\_net\_ssl=true) ``` CREATE EXTERNAL TABLE `test` ( diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java index a1e62d589775d4..31bfc5074a3d0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java @@ -43,7 +43,7 @@ public class EsNodeInfo { private static final Logger LOG = LogManager.getLogger(EsNodeInfo.class); - public EsNodeInfo(String id, Map map) { + public EsNodeInfo(String id, Map map, boolean httpSslEnabled) { this.id = id; EsMajorVersion version = EsMajorVersion.parse((String) map.get("version")); this.name = (String) map.get("name"); @@ -71,7 +71,7 @@ public EsNodeInfo(String id, Map map) { String address = (String) httpMap.get("publish_address"); if (address != null) { String[] scratch = address.split(":"); - this.publishAddress = new TNetworkAddress(scratch[0], Integer.parseInt(scratch[1])); + this.publishAddress = new TNetworkAddress((httpSslEnabled ? "https://" : "") + scratch[0], Integer.parseInt(scratch[1])); this.hasHttp = true; } else { this.publishAddress = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 8615719eeda1c2..6427d64088d18a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -67,9 +67,9 @@ public class EsRestClient { private String[] nodes; private String currentNode; private int currentNodeIndex = 0; - private boolean useSslClient; + private boolean httpSslEnable; - public EsRestClient(String[] nodes, String authUser, String authPassword, boolean useSslClient) { + public EsRestClient(String[] nodes, String authUser, String authPassword, boolean httpSslEnable) { this.nodes = nodes; this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { @@ -77,7 +77,7 @@ public EsRestClient(String[] nodes, String authUser, String authPassword, boolea Credentials.basic(authUser, authPassword)); } this.currentNode = nodes[currentNodeIndex]; - this.useSslClient = useSslClient; + this.httpSslEnable = httpSslEnable; } private void selectNextNode() { @@ -96,7 +96,7 @@ public Map getHttpNodes() throws DorisEsException { } Map nodesMap = new HashMap<>(); for (Map.Entry> entry : nodesData.entrySet()) { - EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); + EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), httpSslEnable); if (node.hasHttp()) { nodesMap.put(node.getId(), node); } @@ -154,6 +154,19 @@ public EsShardPartitions searchShards(String indexName) throws DorisEsException } return EsShardPartitions.findShardPartitions(indexName, searchShards); } + + /** + * init ssl networkClient use lazy way + **/ + private synchronized void initSslNetworkClient() { + if (sslNetworkClient == null) { + sslNetworkClient = new OkHttpClient.Builder() + .readTimeout(10, TimeUnit.SECONDS) + .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) + .hostnameVerifier(new TrustAllHostnameVerifier()) + .build(); + } + } /** * execute request for specific path,it will try again nodes.length times if it fails @@ -165,14 +178,8 @@ private String execute(String path) throws DorisEsException { int retrySize = nodes.length; DorisEsException scratchExceptionForThrow = null; OkHttpClient httpClient; - if (useSslClient) { - if (sslNetworkClient == null) { - sslNetworkClient = new OkHttpClient.Builder() - .readTimeout(10, TimeUnit.SECONDS) - .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) - .hostnameVerifier(new TrustAllHostnameVerifier()) - .build(); - } + if (httpSslEnable) { + initSslNetworkClient(); httpClient = sslNetworkClient; } else { httpClient = networkClient; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java index 061b9133586701..b81deade16cf05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java @@ -94,22 +94,16 @@ public static EsShardPartitions findShardPartitions(String indexName, String sea return partitions; } - public void addHttpAddress(Map nodesInfo, boolean httpSslEnabled) { + public void addHttpAddress(Map nodesInfo) { for (Map.Entry> entry : shardRoutings.entrySet()) { List shardRoutings = entry.getValue(); for (EsShardRouting shardRouting : shardRoutings) { String nodeId = shardRouting.getNodeId(); - TNetworkAddress httpAddress; if (nodesInfo.containsKey(nodeId)) { - httpAddress = nodesInfo.get(nodeId).getPublishAddress(); + shardRouting.setHttpAddress(nodesInfo.get(nodeId).getPublishAddress()); } else { - httpAddress = randomAddress(nodesInfo); + shardRouting.setHttpAddress(randomAddress(nodesInfo)); } - // If ssl is enabled, determine if the https protocol is required - if (httpSslEnabled && !httpAddress.getHostname().startsWith("http")) { - httpAddress.setHostname("https://" + httpAddress.getHostname()); - } - shardRouting.setHttpAddress(httpAddress); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java index 8c9e1f98ca6028..bb5d416f56cb08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java @@ -54,7 +54,7 @@ public void execute(SearchContext context) throws DorisEsException { public void postProcess(SearchContext context) throws DorisEsException { context.partitions(shardPartitions); if (EsTable.TRANSPORT_HTTP.equals(context.esTable().getTransport())) { - context.partitions().addHttpAddress(nodesInfo, context.esTable().isHttpSslEnabled()); + context.partitions().addHttpAddress(nodesInfo); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java index aa82dff94e34e6..ae858017ed23d8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java @@ -50,7 +50,7 @@ public void testWorkFlow(@Injectable EsRestClient client) throws Exception { Map> nodesData = (Map>) mapper.readValue(jsonParser, Map.class).get("nodes"); Map nodesMap = new HashMap<>(); for (Map.Entry> entry : nodesData.entrySet()) { - EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); + EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), false); if (node.hasHttp()) { nodesMap.put(node.getId(), node); } From b75950bf05142ace41d00c0319949a244707f3db Mon Sep 17 00:00:00 2001 From: stalary Date: Fri, 9 Apr 2021 09:30:47 +0800 Subject: [PATCH 11/13] MOD: modify doc --- docs/en/extending-doris/doris-on-es.md | 4 ++-- docs/zh-CN/extending-doris/doris-on-es.md | 4 ++-- .../apache/doris/external/elasticsearch/EsRestClient.java | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/en/extending-doris/doris-on-es.md b/docs/en/extending-doris/doris-on-es.md index 87e9feb712a1b0..2c5e6638ada01f 100644 --- a/docs/en/extending-doris/doris-on-es.md +++ b/docs/en/extending-doris/doris-on-es.md @@ -355,9 +355,9 @@ Parameter | Description ---|--- **es\_nodes\_discovery** | Whether or not to enable ES node discovery. the default is true -When enabled, Doris will find all available nodes from ES. If you only want Doris to access some nodes, you can turn this configuration off +Doris would find all available related data nodes (shards allocated on)from ES when this is true. Just set false if address of ES data nodes are not accessed by Doris BE, eg. the ES cluster is deployed in the intranet which isolated from your public Internet, and users access through a proxy -### Enable SSL protocol when making an HTTP request, default is false(http\_ssl\_enable=true) +### Whether ES cluster enables https access mode, if enabled should set value with`true`, default is false(http\_ssl\_enable=true) ``` CREATE EXTERNAL TABLE `test` ( diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md index f0c6f52a6367b0..14cfb106740b3b 100644 --- a/docs/zh-CN/extending-doris/doris-on-es.md +++ b/docs/zh-CN/extending-doris/doris-on-es.md @@ -352,9 +352,9 @@ PROPERTIES ( ---|--- **es\_nodes\_discovery** | 是否开启es节点发现,默认为true -开启后Doris会从ES发现所有可用节点,如果只希望Doris访问部分节点,可以关闭此配置 +当配置为true时,Doris将从ES找到所有可用的相关数据节点(在上面分配的分片)。如果ES数据节点的地址没有被Doris BE访问,则设置为false。ES集群部署在与公共Internet隔离的内网,用户通过代理访问 -### 在发起http请求时开启ssl协议, 默认为false(es\_net\_ssl=true) +### ES集群是否开启https访问模式,如果开启应设置为`true`,默认为false(es\_net\_ssl=true) ``` CREATE EXTERNAL TABLE `test` ( diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 6427d64088d18a..7e1983a625e180 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -158,7 +158,7 @@ public EsShardPartitions searchShards(String indexName) throws DorisEsException /** * init ssl networkClient use lazy way **/ - private synchronized void initSslNetworkClient() { + private synchronized OkHttpClient getOrCreateSslNetworkClient() { if (sslNetworkClient == null) { sslNetworkClient = new OkHttpClient.Builder() .readTimeout(10, TimeUnit.SECONDS) @@ -166,6 +166,7 @@ private synchronized void initSslNetworkClient() { .hostnameVerifier(new TrustAllHostnameVerifier()) .build(); } + return sslNetworkClient; } /** @@ -179,8 +180,7 @@ private String execute(String path) throws DorisEsException { DorisEsException scratchExceptionForThrow = null; OkHttpClient httpClient; if (httpSslEnable) { - initSslNetworkClient(); - httpClient = sslNetworkClient; + httpClient = getOrCreateSslNetworkClient(); } else { httpClient = networkClient; } @@ -265,7 +265,7 @@ private static SSLSocketFactory createSSLSocketFactory() { sc.init(null, new TrustManager[]{new TrustAllCerts()}, new SecureRandom()); ssfFactory = sc.getSocketFactory(); } catch (Exception e) { - throw new DorisEsException("createSSLSocketFactory error"); + throw new DorisEsException("Errors happens when create ssl socket"); } return ssfFactory; } From 94458e82a07ba059376708db443093be5b1b3cb2 Mon Sep 17 00:00:00 2001 From: stalary Date: Fri, 9 Apr 2021 09:35:08 +0800 Subject: [PATCH 12/13] MOD: modify doc --- docs/en/extending-doris/doris-on-es.md | 2 +- docs/zh-CN/extending-doris/doris-on-es.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/extending-doris/doris-on-es.md b/docs/en/extending-doris/doris-on-es.md index 2c5e6638ada01f..9be1bedb67f219 100644 --- a/docs/en/extending-doris/doris-on-es.md +++ b/docs/en/extending-doris/doris-on-es.md @@ -382,7 +382,7 @@ Parameter Description: Parameter | Description ---|--- -**es\_net\_ssl** | SSL authentication is enabled when supporting HTTPS, the default is false +**http\_ssl\_enabled** | Whether ES cluster enables https access mode The current FE/BE implementation is to trust all, this is a temporary solution, and the real user configuration certificate will be used later diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md index 14cfb106740b3b..a8daa8fdf37630 100644 --- a/docs/zh-CN/extending-doris/doris-on-es.md +++ b/docs/zh-CN/extending-doris/doris-on-es.md @@ -379,7 +379,7 @@ PROPERTIES ( 参数 | 说明 ---|--- -**es\_net\_ssl** | 是否使用ssl,默认为false,当需要支持https时,需要开启此配置 +**http\_ssl\_enabled** | ES集群是否开启https访问模式 目前会fe/be实现方式为信任所有,这是临时解决方案,后续会使用真实的用户配置证书 From 57e5c3228880fab293c935c6adf9581a08917685 Mon Sep 17 00:00:00 2001 From: stalary Date: Fri, 9 Apr 2021 09:44:26 +0800 Subject: [PATCH 13/13] MOD: modify doc --- docs/zh-CN/extending-doris/doris-on-es.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md index a8daa8fdf37630..4a046f08f83f8a 100644 --- a/docs/zh-CN/extending-doris/doris-on-es.md +++ b/docs/zh-CN/extending-doris/doris-on-es.md @@ -354,7 +354,7 @@ PROPERTIES ( 当配置为true时,Doris将从ES找到所有可用的相关数据节点(在上面分配的分片)。如果ES数据节点的地址没有被Doris BE访问,则设置为false。ES集群部署在与公共Internet隔离的内网,用户通过代理访问 -### ES集群是否开启https访问模式,如果开启应设置为`true`,默认为false(es\_net\_ssl=true) +### ES集群是否开启https访问模式,如果开启应设置为`true`,默认为false(http\_ssl\_enabled=true) ``` CREATE EXTERNAL TABLE `test` (