From bbefc813f3671589fc43e990c82749d6ce3b66c5 Mon Sep 17 00:00:00 2001 From: Kay Schubert Date: Mon, 12 Oct 2020 10:16:00 +0200 Subject: [PATCH] Fix http connection reuse At Datameer, we have had failing communication between client and server when the client reused HTTP connections. When I looked at the code, I noticed that the author(s) of the ConnectionKeepAliveStrategy seemed to think that it does what is actually the purpose of a ConnectionReuseStrategy. The comments within getKeepAliveDuration(..) suggest that they thought that by returning -1 they disable connection reuse (it actually means "keep the connection alive indefinitely"). However, I'm not sure if that is the real root cause of our issues regarding failing communication between client and server when connections are reused, so I designed the new code so that connections are never reused unless told to via property. --- .../ClickHouseConnectionSettings.java | 1 + .../settings/ClickHouseProperties.java | 13 +++++++- .../util/ClickHouseHttpClientBuilder.java | 33 ++++++++++++++----- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java index 202574fed..c4451efe7 100644 --- a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java +++ b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java @@ -28,6 +28,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator { KEEP_ALIVE_TIMEOUT("keepAliveTimeout", 30 * 1000, ""), + REUSE_CONNECTIONS("reuseConnections", false, ""), /** * for ConnectionManager diff --git a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java index ecf08fa4e..dea27b4f3 100644 --- a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java +++ b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java @@ -93,7 +93,7 @@ public class ClickHouseProperties { private Boolean insertDeduplicate; private Boolean insertDistributedSync; private Boolean anyJoinDistinctRightTableKeys; - + private boolean reuseConnections; public ClickHouseProperties() { this(new Properties()); @@ -108,6 +108,7 @@ public ClickHouseProperties(Properties info) { this.connectionTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.CONNECTION_TIMEOUT); this.dataTransferTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT); this.keepAliveTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.KEEP_ALIVE_TIMEOUT); + this.reuseConnections = (Boolean)getSetting(info, ClickHouseConnectionSettings.REUSE_CONNECTIONS); this.timeToLiveMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS); this.defaultMaxPerRoute = (Integer)getSetting(info, ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE); this.maxTotal = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_TOTAL); @@ -172,6 +173,7 @@ public Properties asProperties() { ret.put(ClickHouseConnectionSettings.CONNECTION_TIMEOUT.getKey(), String.valueOf(connectionTimeout)); ret.put(ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT.getKey(), String.valueOf(dataTransferTimeout)); ret.put(ClickHouseConnectionSettings.KEEP_ALIVE_TIMEOUT.getKey(), String.valueOf(keepAliveTimeout)); + ret.put(ClickHouseConnectionSettings.REUSE_CONNECTIONS.getKey(), String.valueOf(reuseConnections)); ret.put(ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS.getKey(), String.valueOf(timeToLiveMillis)); ret.put(ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE.getKey(), String.valueOf(defaultMaxPerRoute)); ret.put(ClickHouseConnectionSettings.MAX_TOTAL.getKey(), String.valueOf(maxTotal)); @@ -239,6 +241,7 @@ public ClickHouseProperties(ClickHouseProperties properties) { setConnectionTimeout(properties.connectionTimeout); setDataTransferTimeout(properties.dataTransferTimeout); setKeepAliveTimeout(properties.keepAliveTimeout); + setReuseConnections(properties.reuseConnections); setTimeToLiveMillis(properties.timeToLiveMillis); setDefaultMaxPerRoute(properties.defaultMaxPerRoute); setMaxTotal(properties.maxTotal); @@ -527,6 +530,14 @@ public void setKeepAliveTimeout(int keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; } + public boolean getReuseConnections() { + return reuseConnections; + } + + public void setReuseConnections(boolean reuseConnections) { + this.reuseConnections = reuseConnections; + } + public String getUser() { return user; } diff --git a/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java b/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java index 83047cd06..3416084d5 100644 --- a/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java +++ b/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java @@ -1,5 +1,6 @@ package ru.yandex.clickhouse.util; +import org.apache.http.ConnectionReuseStrategy; import org.apache.http.Header; import org.apache.http.HeaderElement; import org.apache.http.HeaderElementIterator; @@ -13,7 +14,9 @@ import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.NoConnectionReuseStrategy; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultClientConnectionReuseStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicHeader; @@ -59,6 +62,7 @@ public ClickHouseHttpClientBuilder(ClickHouseProperties properties) { public CloseableHttpClient buildClient() throws Exception { return HttpClientBuilder.create() .setConnectionManager(getConnectionManager()) + .setConnectionReuseStrategy(createReuseStrategy()) .setKeepAliveStrategy(createKeepAliveStrategy()) .setDefaultConnectionConfig(getConnectionConfig()) .setDefaultRequestConfig(getRequestConfig()) @@ -115,25 +119,38 @@ private Collection
getDefaultHeaders() { return headers; } - private ConnectionKeepAliveStrategy createKeepAliveStrategy() { - return new ConnectionKeepAliveStrategy() { + private ConnectionReuseStrategy createReuseStrategy() { + return properties.getReuseConnections() ? new DefaultClientConnectionReuseStrategy() { + @Override - public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) { - // in case of errors keep-alive not always works. close connection just in case + public boolean keepAlive(HttpResponse httpResponse, HttpContext context) { if (httpResponse.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { - return -1; + return false; } + boolean keepAliveHeaderFound = false; HeaderElementIterator it = new BasicHeaderElementIterator( httpResponse.headerIterator(HTTP.CONN_DIRECTIVE)); while (it.hasNext()) { HeaderElement he = it.nextElement(); String param = he.getName(); - //String value = he.getValue(); if (param != null && param.equalsIgnoreCase(HTTP.CONN_KEEP_ALIVE)) { - return properties.getKeepAliveTimeout(); + keepAliveHeaderFound = true; + break; } } - return -1; + if (!keepAliveHeaderFound) { + return false; + } + return super.keepAlive(httpResponse, context); + } + } : new NoConnectionReuseStrategy(); + } + + private ConnectionKeepAliveStrategy createKeepAliveStrategy() { + return new ConnectionKeepAliveStrategy() { + @Override + public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) { + return properties.getKeepAliveTimeout(); } }; }