Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator {
+ " ClickHouse rejects request execution if its time exceeds max_execution_time"),


@Deprecated
KEEP_ALIVE_TIMEOUT("keepAliveTimeout", 30 * 1000, ""),

/**
Expand All @@ -35,6 +36,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator {
TIME_TO_LIVE_MILLIS("timeToLiveMillis", 60 * 1000, ""),
DEFAULT_MAX_PER_ROUTE("defaultMaxPerRoute", 500, ""),
MAX_TOTAL("maxTotal", 10000, ""),
MAX_RETRIES("maxRetries", 3, "Maximum retries(default to 3) for idempotent operation. Set 0 to disable retry."),

/**
* additional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class ClickHouseProperties {
private int timeToLiveMillis;
private int defaultMaxPerRoute;
private int maxTotal;
private int maxRetries;
private String host;
private int port;
private boolean usePathAsDb;
Expand Down Expand Up @@ -113,6 +114,7 @@ public ClickHouseProperties(Properties info) {
this.timeToLiveMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS);
this.defaultMaxPerRoute = (Integer)getSetting(info, ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE);
this.maxTotal = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_TOTAL);
this.maxRetries = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_RETRIES);
this.maxCompressBufferSize = (Integer) getSetting(info, ClickHouseConnectionSettings.MAX_COMPRESS_BUFFER_SIZE);
this.ssl = (Boolean) getSetting(info, ClickHouseConnectionSettings.SSL);
this.sslRootCertificate = (String) getSetting(info, ClickHouseConnectionSettings.SSL_ROOT_CERTIFICATE);
Expand Down Expand Up @@ -179,6 +181,7 @@ public Properties asProperties() {
ret.put(ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS.getKey(), String.valueOf(timeToLiveMillis));
ret.put(ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE.getKey(), String.valueOf(defaultMaxPerRoute));
ret.put(ClickHouseConnectionSettings.MAX_TOTAL.getKey(), String.valueOf(maxTotal));
ret.put(ClickHouseConnectionSettings.MAX_RETRIES.getKey(), String.valueOf(maxRetries));
ret.put(ClickHouseConnectionSettings.MAX_COMPRESS_BUFFER_SIZE.getKey(), String.valueOf(maxCompressBufferSize));
ret.put(ClickHouseConnectionSettings.SSL.getKey(), String.valueOf(ssl));
ret.put(ClickHouseConnectionSettings.SSL_ROOT_CERTIFICATE.getKey(), String.valueOf(sslRootCertificate));
Expand Down Expand Up @@ -248,6 +251,7 @@ public ClickHouseProperties(ClickHouseProperties properties) {
setTimeToLiveMillis(properties.timeToLiveMillis);
setDefaultMaxPerRoute(properties.defaultMaxPerRoute);
setMaxTotal(properties.maxTotal);
setMaxRetries(properties.maxRetries);
setMaxCompressBufferSize(properties.maxCompressBufferSize);
setSsl(properties.ssl);
setSslRootCertificate(properties.sslRootCertificate);
Expand Down Expand Up @@ -594,6 +598,14 @@ public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}

public int getMaxRetries() {
return maxRetries;
}

public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

public int getMaxCompressBufferSize() {
return maxCompressBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,19 @@

import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.NoHttpResponseException;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
Expand All @@ -50,11 +49,10 @@
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;

import ru.yandex.clickhouse.settings.ClickHouseProperties;
Expand All @@ -71,16 +69,32 @@ public ClickHouseHttpClientBuilder(ClickHouseProperties properties) {
public CloseableHttpClient buildClient() throws Exception {
return HttpClientBuilder.create()
.setConnectionManager(getConnectionManager())
.setRetryHandler(getRequestRetryHandler())
.setConnectionReuseStrategy(getConnectionReuseStrategy())
.setDefaultConnectionConfig(getConnectionConfig())
.setDefaultRequestConfig(getRequestConfig())
.setDefaultHeaders(getDefaultHeaders())
.setDefaultCredentialsProvider(getDefaultCredentialsProvider())
.disableContentCompression() // gzip здесь ни к чему. Используется lz4 при compress=1
.disableContentCompression() // gzip is not needed. Use lz4 when compress=1
.disableRedirectHandling()
.build();
}

private HttpRequestRetryHandler getRequestRetryHandler() {
final int maxRetries = properties.getMaxRetries();
return new DefaultHttpRequestRetryHandler(maxRetries, false) {
@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
if (executionCount > maxRetries || context == null
|| !Boolean.TRUE.equals(context.getAttribute("is_idempotent"))) {
return false;
}

return (exception instanceof NoHttpResponseException) || super.retryRequest(exception, executionCount, context);
}
};
}

public static HttpClientContext createClientContext(ClickHouseProperties props) {
if (props == null
|| !isConfigurationValidForAuth(props))
Expand Down Expand Up @@ -155,29 +169,6 @@ private Collection<Header> getDefaultHeaders() {
return headers;
}

private ConnectionKeepAliveStrategy createKeepAliveStrategy() {
return new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {
// in case of errors keep-alive not always works. close connection just in case
if (httpResponse.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) {
return -1;
}
HeaderElementIterator it = new BasicHeaderElementIterator(
httpResponse.headerIterator(HTTP.CONN_DIRECTIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
//String value = he.getValue();
if (param != null && param.equalsIgnoreCase(HTTP.CONN_KEEP_ALIVE)) {
return properties.getKeepAliveTimeout();
}
}
return -1;
}
};
}

private SSLContext getSSLContext()
throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, KeyManagementException {
SSLContext ctx = SSLContext.getInstance("TLS");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package ru.yandex.clickhouse.util;

import org.apache.http.HttpHost;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -141,7 +145,72 @@ private static Object[][] provideAuthUserPasswordTestData() {
null, null, "baz", "Basic ZGVmYXVsdDpiYXo=" // default:baz
},
};
}

private static WireMockServer newServer() {
WireMockServer server = new WireMockServer(
WireMockConfiguration.wireMockConfig().dynamicPort());
server.start();
server.stubFor(WireMock.post(WireMock.urlPathMatching("/*"))
.willReturn(WireMock.aResponse().withStatus(200).withHeader("Connection", "Keep-Alive")
.withHeader("Content-Type", "text/plain; charset=UTF-8")
.withHeader("Transfer-Encoding", "chunked").withHeader("Keep-Alive", "timeout=3")
.withBody("OK.........................").withFixedDelay(2)));
return server;
}

private static void shutDownServerWithDelay(final WireMockServer server, final long delayMs) {
new Thread() {
public void run() {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
e.printStackTrace();
}

server.shutdownServer();
server.stop();
}
}.start();
}

// @Test(dependsOnMethods = { "testWithRetry" }, expectedExceptions = { NoHttpResponseException.class })
public void testWithoutRetry() throws Exception {
final WireMockServer server = newServer();

ClickHouseProperties props = new ClickHouseProperties();
props.setMaxRetries(0);
ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%201");

shutDownServerWithDelay(server, 100);

try {
client.execute(post);
} finally {
client.close();
}
}

// @Test(expectedExceptions = { HttpHostConnectException.class })
public void testWithRetry() throws Exception {
final WireMockServer server = newServer();

ClickHouseProperties props = new ClickHouseProperties();
// props.setMaxRetries(3);
ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpContext context = new BasicHttpContext();
context.setAttribute("is_idempotent", Boolean.TRUE);
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%202");

shutDownServerWithDelay(server, 100);

try {
client.execute(post, context);
} finally {
client.close();
}
}
}