diff --git a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java index 202574fed..6928a3409 100644 --- a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java +++ b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java @@ -27,6 +27,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator { + " ClickHouse rejects request execution if its time exceeds max_execution_time"), + @Deprecated KEEP_ALIVE_TIMEOUT("keepAliveTimeout", 30 * 1000, ""), /** @@ -35,6 +36,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator { TIME_TO_LIVE_MILLIS("timeToLiveMillis", 60 * 1000, ""), DEFAULT_MAX_PER_ROUTE("defaultMaxPerRoute", 500, ""), MAX_TOTAL("maxTotal", 10000, ""), + MAX_RETRIES("maxRetries", 3, "Maximum retries(default to 3) for idempotent operation. Set 0 to disable retry."), /** * additional diff --git a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java index b368917e9..6c01b1943 100644 --- a/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java +++ b/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java @@ -22,6 +22,7 @@ public class ClickHouseProperties { private int timeToLiveMillis; private int defaultMaxPerRoute; private int maxTotal; + private int maxRetries; private String host; private int port; private boolean usePathAsDb; @@ -113,6 +114,7 @@ public ClickHouseProperties(Properties info) { this.timeToLiveMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS); this.defaultMaxPerRoute = (Integer)getSetting(info, ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE); this.maxTotal = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_TOTAL); + this.maxRetries = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_RETRIES); this.maxCompressBufferSize = (Integer) getSetting(info, ClickHouseConnectionSettings.MAX_COMPRESS_BUFFER_SIZE); this.ssl = (Boolean) getSetting(info, ClickHouseConnectionSettings.SSL); this.sslRootCertificate = (String) getSetting(info, ClickHouseConnectionSettings.SSL_ROOT_CERTIFICATE); @@ -179,6 +181,7 @@ public Properties asProperties() { ret.put(ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS.getKey(), String.valueOf(timeToLiveMillis)); ret.put(ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE.getKey(), String.valueOf(defaultMaxPerRoute)); ret.put(ClickHouseConnectionSettings.MAX_TOTAL.getKey(), String.valueOf(maxTotal)); + ret.put(ClickHouseConnectionSettings.MAX_RETRIES.getKey(), String.valueOf(maxRetries)); ret.put(ClickHouseConnectionSettings.MAX_COMPRESS_BUFFER_SIZE.getKey(), String.valueOf(maxCompressBufferSize)); ret.put(ClickHouseConnectionSettings.SSL.getKey(), String.valueOf(ssl)); ret.put(ClickHouseConnectionSettings.SSL_ROOT_CERTIFICATE.getKey(), String.valueOf(sslRootCertificate)); @@ -248,6 +251,7 @@ public ClickHouseProperties(ClickHouseProperties properties) { setTimeToLiveMillis(properties.timeToLiveMillis); setDefaultMaxPerRoute(properties.defaultMaxPerRoute); setMaxTotal(properties.maxTotal); + setMaxRetries(properties.maxRetries); setMaxCompressBufferSize(properties.maxCompressBufferSize); setSsl(properties.ssl); setSslRootCertificate(properties.sslRootCertificate); @@ -594,6 +598,14 @@ public void setMaxTotal(int maxTotal) { this.maxTotal = maxTotal; } + public int getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + public int getMaxCompressBufferSize() { return maxCompressBufferSize; } diff --git a/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java b/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java index 59e893147..66bc9980b 100644 --- a/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java +++ b/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java @@ -27,20 +27,19 @@ import org.apache.http.ConnectionReuseStrategy; import org.apache.http.Header; -import org.apache.http.HeaderElement; -import org.apache.http.HeaderElementIterator; import org.apache.http.HttpHeaders; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; +import org.apache.http.NoHttpResponseException; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.AuthCache; import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.config.ConnectionConfig; import org.apache.http.config.RegistryBuilder; -import org.apache.http.conn.ConnectionKeepAliveStrategy; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; @@ -50,11 +49,10 @@ import org.apache.http.impl.client.BasicAuthCache; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicHeader; -import org.apache.http.message.BasicHeaderElementIterator; -import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HttpContext; import ru.yandex.clickhouse.settings.ClickHouseProperties; @@ -71,16 +69,32 @@ public ClickHouseHttpClientBuilder(ClickHouseProperties properties) { public CloseableHttpClient buildClient() throws Exception { return HttpClientBuilder.create() .setConnectionManager(getConnectionManager()) + .setRetryHandler(getRequestRetryHandler()) .setConnectionReuseStrategy(getConnectionReuseStrategy()) .setDefaultConnectionConfig(getConnectionConfig()) .setDefaultRequestConfig(getRequestConfig()) .setDefaultHeaders(getDefaultHeaders()) .setDefaultCredentialsProvider(getDefaultCredentialsProvider()) - .disableContentCompression() // gzip здесь ни к чему. Используется lz4 при compress=1 + .disableContentCompression() // gzip is not needed. Use lz4 when compress=1 .disableRedirectHandling() .build(); } + private HttpRequestRetryHandler getRequestRetryHandler() { + final int maxRetries = properties.getMaxRetries(); + return new DefaultHttpRequestRetryHandler(maxRetries, false) { + @Override + public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { + if (executionCount > maxRetries || context == null + || !Boolean.TRUE.equals(context.getAttribute("is_idempotent"))) { + return false; + } + + return (exception instanceof NoHttpResponseException) || super.retryRequest(exception, executionCount, context); + } + }; + } + public static HttpClientContext createClientContext(ClickHouseProperties props) { if (props == null || !isConfigurationValidForAuth(props)) @@ -155,29 +169,6 @@ private Collection
getDefaultHeaders() { return headers; } - private ConnectionKeepAliveStrategy createKeepAliveStrategy() { - return new ConnectionKeepAliveStrategy() { - @Override - public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) { - // in case of errors keep-alive not always works. close connection just in case - if (httpResponse.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { - return -1; - } - HeaderElementIterator it = new BasicHeaderElementIterator( - httpResponse.headerIterator(HTTP.CONN_DIRECTIVE)); - while (it.hasNext()) { - HeaderElement he = it.nextElement(); - String param = he.getName(); - //String value = he.getValue(); - if (param != null && param.equalsIgnoreCase(HTTP.CONN_KEEP_ALIVE)) { - return properties.getKeepAliveTimeout(); - } - } - return -1; - } - }; - } - private SSLContext getSSLContext() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, KeyManagementException { SSLContext ctx = SSLContext.getInstance("TLS"); diff --git a/src/test/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilderTest.java b/src/test/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilderTest.java index e7cc88027..e7eee897f 100644 --- a/src/test/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilderTest.java +++ b/src/test/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilderTest.java @@ -1,8 +1,12 @@ package ru.yandex.clickhouse.util; import org.apache.http.HttpHost; +import org.apache.http.NoHttpResponseException; import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.HttpHostConnectException; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.protocol.BasicHttpContext; +import org.apache.http.protocol.HttpContext; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -141,7 +145,72 @@ private static Object[][] provideAuthUserPasswordTestData() { null, null, "baz", "Basic ZGVmYXVsdDpiYXo=" // default:baz }, }; + } + + private static WireMockServer newServer() { + WireMockServer server = new WireMockServer( + WireMockConfiguration.wireMockConfig().dynamicPort()); + server.start(); + server.stubFor(WireMock.post(WireMock.urlPathMatching("/*")) + .willReturn(WireMock.aResponse().withStatus(200).withHeader("Connection", "Keep-Alive") + .withHeader("Content-Type", "text/plain; charset=UTF-8") + .withHeader("Transfer-Encoding", "chunked").withHeader("Keep-Alive", "timeout=3") + .withBody("OK.........................").withFixedDelay(2))); + return server; + } + private static void shutDownServerWithDelay(final WireMockServer server, final long delayMs) { + new Thread() { + public void run() { + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + server.shutdownServer(); + server.stop(); + } + }.start(); } + // @Test(dependsOnMethods = { "testWithRetry" }, expectedExceptions = { NoHttpResponseException.class }) + public void testWithoutRetry() throws Exception { + final WireMockServer server = newServer(); + + ClickHouseProperties props = new ClickHouseProperties(); + props.setMaxRetries(0); + ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props); + CloseableHttpClient client = builder.buildClient(); + HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%201"); + + shutDownServerWithDelay(server, 100); + + try { + client.execute(post); + } finally { + client.close(); + } + } + + // @Test(expectedExceptions = { HttpHostConnectException.class }) + public void testWithRetry() throws Exception { + final WireMockServer server = newServer(); + + ClickHouseProperties props = new ClickHouseProperties(); + // props.setMaxRetries(3); + ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props); + CloseableHttpClient client = builder.buildClient(); + HttpContext context = new BasicHttpContext(); + context.setAttribute("is_idempotent", Boolean.TRUE); + HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%202"); + + shutDownServerWithDelay(server, 100); + + try { + client.execute(post, context); + } finally { + client.close(); + } + } }