From bf76f288563a2198c6ba6b3862bd980a4933ceb7 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Tue, 5 Aug 2025 17:10:21 -0700 Subject: [PATCH 1/6] implemented network timeout. no tests --- .../client/api/insert/InsertSettings.java | 24 ++++++ .../client/api/query/QuerySettings.java | 12 +++ .../com/clickhouse/jdbc/ConnectionImpl.java | 75 ++++++++++++++----- .../com/clickhouse/jdbc/ResultSetImpl.java | 4 + .../com/clickhouse/jdbc/StatementImpl.java | 13 ++++ .../clickhouse/jdbc/WriterStatementImpl.java | 10 +++ .../clickhouse/jdbc/internal/JdbcUtils.java | 11 +++ .../com/clickhouse/jdbc/ConnectionTest.java | 26 ++++--- 8 files changed, 147 insertions(+), 28 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java index 6179b09ce..2e24fe9d2 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java @@ -5,6 +5,7 @@ import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.internal.ValidationUtils; +import com.clickhouse.client.api.query.QuerySettings; import org.apache.hc.core5.http.HttpHeaders; import java.util.Collection; @@ -264,4 +265,27 @@ public InsertSettings logComment(String logComment) { public String getLogComment() { return logComment; } + + public static InsertSettings merge(InsertSettings source, InsertSettings override) { + InsertSettings merged = new InsertSettings(); + if (source != null) { + merged.rawSettings.putAll(source.rawSettings); + } + if (override != null && override != source) {// avoid copying the literally same object + merged.rawSettings.putAll(override.rawSettings); + } + return merged; + } + + public void setNetworkTimeout(Long networkTimeout) { + if (networkTimeout != null) { + rawSettings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), networkTimeout.intValue()); + } else { + rawSettings.remove(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey()); + } + } + + public Long getNetworkTimeout() { + return (Long) rawSettings.get(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey()); + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java b/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java index 16580d129..f6c0d73dc 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java @@ -267,6 +267,18 @@ public String getLogComment() { return logComment; } + public void setNetworkTimeout(Long networkTimeout) { + if (networkTimeout != null) { + rawSettings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), networkTimeout.intValue()); + } else { + rawSettings.remove(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey()); + } + } + + public Long getNetworkTimeout() { + return (Long) rawSettings.get(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey()); + } + public static QuerySettings merge(QuerySettings source, QuerySettings override) { QuerySettings merged = new QuerySettings(); if (source != null) { diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java index 9d86c8f1e..d7c7f16ec 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java @@ -2,11 +2,15 @@ import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientConfigProperties; +import com.clickhouse.client.api.ClientException; +import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader; import com.clickhouse.client.api.internal.ServerSettings; import com.clickhouse.client.api.metadata.TableSchema; import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.api.query.QueryResponse; import com.clickhouse.client.api.query.QuerySettings; import com.clickhouse.data.ClickHouseDataType; +import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.jdbc.internal.ExceptionUtils; import com.clickhouse.jdbc.internal.JdbcConfiguration; import com.clickhouse.jdbc.internal.JdbcUtils; @@ -16,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; @@ -43,10 +48,11 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class ConnectionImpl implements Connection, JdbcV2Wrapper { - private static final Logger log = LoggerFactory.getLogger(ConnectionImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(ConnectionImpl.class); protected final String url; private final Client client; // this member is private to force using getClient() @@ -65,9 +71,11 @@ public class ConnectionImpl implements Connection, JdbcV2Wrapper { private final SqlParser sqlParser; + private Executor networkTimeoutExecutor; + public ConnectionImpl(String url, Properties info) throws SQLException { try { - log.debug("Creating connection to {}", url); + LOG.debug("Creating connection to {}", url); this.url = url;//Raw URL this.config = new JdbcConfiguration(url, info); this.onCluster = false; @@ -86,10 +94,10 @@ public ConnectionImpl(String url, Properties info) throws SQLException { } if (this.config.isDisableFrameworkDetection()) { - log.debug("Framework detection is disabled."); + LOG.debug("Framework detection is disabled."); } else { String detectedFrameworks = Driver.FrameworksDetection.getFrameworksDetected(); - log.debug("Detected frameworks: {}", detectedFrameworks); + LOG.debug("Detected frameworks: {}", detectedFrameworks); if (!detectedFrameworks.trim().isEmpty()) { clientName += " (" + detectedFrameworks + ")"; } @@ -210,9 +218,8 @@ public void close() throws SQLException { if (isClosed()) { return; } - - client.close(); - closed = true; + closed = true; // mark as closed to prevent further invocations + client.close(); // this will disrupt pending requests. } @Override @@ -597,27 +604,59 @@ public String getSchema() throws SQLException { @Override public void abort(Executor executor) throws SQLException { - if (!config.isIgnoreUnsupportedRequests()) { - throw new SQLFeatureNotSupportedException("abort not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED); + if (executor == null) { + throw new SQLException("Executor must be not null"); } + // This method should check permissions with SecurityManager but the one is deprecated. + // There is no replacement for SecurityManger and it is marked for removal. + this.close(); } @Override public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { - //TODO: Should this be supported? - if (!config.isIgnoreUnsupportedRequests()) { - throw new SQLFeatureNotSupportedException("setNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED); + ensureOpen(); + + // Very good mail thread about this method implementation. https://mail.openjdk.org/pipermail/jdbc-spec-discuss/2017-November/000236.html + + // This method should check permissions with SecurityManager but the one is deprecated. + // There is no replacement for SecurityManger and it is marked for removal. + if (milliseconds > 0 && executor == null) { + // we need executor only for positive timeout values. + throw new SQLException("Executor must be not null"); + } + if (milliseconds < 0) { + throw new SQLException("Timeout must be >= 0"); + } + + // How it should work: + // if timeout is set with this method then any timeout exception should be reported to the connection + // when connection get signal about timeout it uses executor to abort itself + // Some connection pools set timeout before calling Connection#close() to ensure that this operation will not hang + // Socket timeout is propagated with QuerySettings this connection has. + networkTimeoutExecutor = executor; + defaultQuerySettings.setOption(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), (long)milliseconds); + } + + + // Should be called by child object to notify about timeout. + public void onNetworkTimeout() throws SQLException { + if (isClosed() || networkTimeoutExecutor == null) { + return; // we closed already so do nothing. } + + networkTimeoutExecutor.execute(() -> { + try { + this.abort(networkTimeoutExecutor); + } catch (SQLException e) { + throw new RuntimeException("Failed to abort connection", e); + } + }); } @Override public int getNetworkTimeout() throws SQLException { - //TODO: Should this be supported? - if (!config.isIgnoreUnsupportedRequests()) { - throw new SQLFeatureNotSupportedException("getNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED); - } - - return -1; + Long networkTimeout = defaultQuerySettings.getNetworkTimeout(); + return networkTimeout == null ? 0 : networkTimeout.intValue(); } /** diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java index fa6cd26a0..4faab3ac8 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java @@ -15,6 +15,7 @@ import java.io.Reader; import java.io.StringReader; import java.math.BigDecimal; +import java.net.SocketTimeoutException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.sql.Blob; @@ -99,6 +100,9 @@ public boolean next() throws SQLException { try { return reader.next() != null; } catch (Exception e) { + if (e instanceof SocketTimeoutException) { + this.parentStatement.onNetworkTimeout(); + } throw ExceptionUtils.toSqlState(e); } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 3e1a64848..1a53563fe 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.SocketTimeoutException; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; @@ -168,6 +169,10 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr } } onResultSetClosed(null); + + if (e instanceof SocketTimeoutException) { + this.connection.onNetworkTimeout(); + } throw ExceptionUtils.toSqlState(e); } } @@ -203,6 +208,9 @@ protected long executeUpdateImpl(String sql, QuerySettings settings) throws SQLE updateCount = Math.max(0, (int) response.getWrittenRows()); // when statement alters schema no result rows returned. lastQueryId = response.getQueryId(); } catch (Exception e) { + if (e instanceof SocketTimeoutException) { + this.connection.onNetworkTimeout(); + } throw ExceptionUtils.toSqlState(e); } @@ -610,4 +618,9 @@ public long executeLargeUpdate(String sql, String[] columnNames) throws SQLExcep public String getLastQueryId() { return lastQueryId; } + + // Proxy method for child objects. Do not call. + public void onNetworkTimeout() throws SQLException { + this.connection.onNetworkTimeout(); + } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java index 833a7f352..6e81931d1 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java @@ -16,6 +16,7 @@ import java.io.InputStream; import java.io.Reader; import java.math.BigDecimal; +import java.net.SocketTimeoutException; import java.net.URL; import java.sql.Array; import java.sql.Blob; @@ -109,6 +110,9 @@ public long executeLargeUpdate() throws SQLException { try { writer.commitRow(); } catch (Exception e) { + if (e instanceof SocketTimeoutException) { + this.connection.onNetworkTimeout(); + } throw new SQLException(e); } @@ -121,6 +125,9 @@ public long executeLargeUpdate() throws SQLException { updateCount = Math.max(0, (int) response.getWrittenRows()); // when statement alters schema no result rows returned. lastQueryId = response.getQueryId(); } catch (Exception e) { + if (e instanceof SocketTimeoutException) { + this.connection.onNetworkTimeout(); + } throw ExceptionUtils.toSqlState(e); } finally { try { @@ -298,6 +305,9 @@ public void addBatch() throws SQLException { try { writer.commitRow(); } catch (Exception e) { + if (e instanceof SocketTimeoutException) { + this.connection.onNetworkTimeout(); + } throw new SQLException(e); } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java index 9752d98ab..d9d780395 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java @@ -6,6 +6,7 @@ import com.clickhouse.data.Tuple; import com.clickhouse.jdbc.types.Array; import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; import java.awt.*; import java.math.BigInteger; @@ -297,4 +298,14 @@ public static List convertList(List values, Class type) throw } return convertedValues; } + + public static void safeClose(AutoCloseable closeable, Logger logger) { + if (closeable != null) { + try { + closeable.close(); + } catch (Exception ex) { + logger.warn("Failed to close closeable after exception", ex); + } + } + } } diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java index bec3d83c5..5e209e012 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java @@ -28,10 +28,13 @@ import java.util.Base64; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; public class ConnectionTest extends JdbcIntegrationTest { @@ -384,20 +387,23 @@ public void setSchemaTest() throws SQLException { @Test(groups = { "integration" }) public void abortTest() throws SQLException { - Connection localConnection = this.getJdbcConnection(); - assertThrows(SQLFeatureNotSupportedException.class, () -> localConnection.abort(null)); + try (Connection conn = this.getJdbcConnection()) { + conn.abort(Executors.newSingleThreadExecutor()); + assertTrue(conn.isClosed()); + } } @Test(groups = { "integration" }) - public void setNetworkTimeoutTest() throws SQLException { - Connection localConnection = this.getJdbcConnection(); - assertThrows(SQLFeatureNotSupportedException.class, () -> localConnection.setNetworkTimeout(null, 0)); - } + public void testNetworkTimeout() throws SQLException { + try { + Connection conn = this.getJdbcConnection(); + int t1 = (int) TimeUnit.SECONDS.toMillis(20); + conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), t1); + Assert.assertEquals(t1, conn.getNetworkTimeout()); - @Test(groups = { "integration" }) - public void getNetworkTimeoutTest() throws SQLException { - Connection localConnection = this.getJdbcConnection(); - assertThrows(SQLFeatureNotSupportedException.class, localConnection::getNetworkTimeout); + } catch (Exception e) { + + } } @Test(groups = { "integration" }) From 4a0540d2f18b3100ebd681108c9aa41cde9e8a52 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 25 Aug 2025 15:45:25 -0700 Subject: [PATCH 2/6] merge main. a few fixes --- .../client/api/insert/InsertSettings.java | 36 ++++++++++--------- .../client/api/internal/CommonSettings.java | 21 +++++++++++ .../client/api/query/QuerySettings.java | 21 ++++++----- .../com/clickhouse/jdbc/ConnectionImpl.java | 3 +- 4 files changed, 55 insertions(+), 26 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java index 1c1fd9984..ef64ac49d 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java @@ -3,9 +3,9 @@ import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.internal.CommonSettings; -import com.clickhouse.client.api.query.QuerySettings; import org.apache.hc.core5.http.HttpHeaders; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.Map; @@ -28,6 +28,11 @@ public InsertSettings(Map settings) { } } + private InsertSettings(CommonSettings settings) { + this.settings = settings; + setDefaults(); + } + private void setDefaults() {// Default settings, for now a very small list this.setInputStreamCopyBufferSize(DEFAULT_INPUT_STREAM_BATCH_SIZE); } @@ -277,25 +282,24 @@ public String getLogComment() { } public static InsertSettings merge(InsertSettings source, InsertSettings override) { - InsertSettings merged = new InsertSettings(); - if (source != null) { - merged.rawSettings.putAll(source.rawSettings); - } - if (override != null && override != source) {// avoid copying the literally same object - merged.rawSettings.putAll(override.rawSettings); - } - return merged; + CommonSettings mergedSettings = source.settings.copyAndMerge(override.settings); + return new InsertSettings(mergedSettings); } - public void setNetworkTimeout(Long networkTimeout) { - if (networkTimeout != null) { - rawSettings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), networkTimeout.intValue()); - } else { - rawSettings.remove(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey()); - } + /** + * Sets a network operation timeout. + * @param timeout + * @param unit + */ + public void setNetworkTimeout(long timeout, ChronoUnit unit) { + settings.setNetworkTimeout(timeout, unit); } + /** + * Returns network timeout. Zero value is returned if no timeout is set. + * @return timeout in ms. + */ public Long getNetworkTimeout() { - return (Long) rawSettings.get(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey()); + return settings.getNetworkTimeout(); } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java index d703570b7..72014f632 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java @@ -3,6 +3,8 @@ import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientConfigProperties; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -217,6 +219,25 @@ public String getLogComment() { return logComment; } + /** + * Sets a network operation timeout. + * @param timeout + * @param unit + */ + public void setNetworkTimeout(long timeout, ChronoUnit unit) { + settings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), Duration.of(timeout, unit).toMillis()); + } + + /** + * Returns network timeout. Zero value is returned if no timeout is set. + * @return timeout in ms. + */ + public Long getNetworkTimeout() { + return (Long) getOption(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), + ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getDefaultValue()); + } + + public CommonSettings copyAndMerge(CommonSettings override) { CommonSettings copy = new CommonSettings(); copy.settings.putAll(settings); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java b/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java index fcd884089..1ade90c3d 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java @@ -8,8 +8,8 @@ import com.clickhouse.client.api.internal.ValidationUtils; import com.clickhouse.data.ClickHouseFormat; +import java.time.temporal.ChronoUnit; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.TimeZone; @@ -269,16 +269,21 @@ public String getLogComment() { return settings.getLogComment(); } - public void setNetworkTimeout(Long networkTimeout) { - if (networkTimeout != null) { - rawSettings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), networkTimeout.intValue()); - } else { - rawSettings.remove(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey()); - } + /** + * Sets a network operation timeout. + * @param timeout + * @param unit + */ + public void setNetworkTimeout(long timeout, ChronoUnit unit) { + settings.setNetworkTimeout(timeout, unit); } + /** + * Returns network timeout. Zero value is returned if no timeout is set. + * @return timeout in ms. + */ public Long getNetworkTimeout() { - return (Long) rawSettings.get(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey()); + return settings.getNetworkTimeout(); } public static QuerySettings merge(QuerySettings source, QuerySettings override) { diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java index c246d68b3..c4b26cdde 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java @@ -658,8 +658,7 @@ public void onNetworkTimeout() throws SQLException { @Override public int getNetworkTimeout() throws SQLException { - Long networkTimeout = defaultQuerySettings.getNetworkTimeout(); - return networkTimeout == null ? 0 : networkTimeout.intValue(); + return defaultQuerySettings.getNetworkTimeout().intValue(); } /** From 16520a7f5db87c3e59e4fe51170dbbfd1833ed0f Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 25 Aug 2025 18:16:29 -0700 Subject: [PATCH 3/6] complete Connection#setNetworkTimeout() tests --- .../api/internal/HttpAPIClientHelper.java | 16 +++++----- .../com/clickhouse/jdbc/StatementImpl.java | 15 ++++++--- .../clickhouse/jdbc/WriterStatementImpl.java | 13 ++------ .../com/clickhouse/jdbc/ConnectionTest.java | 31 ++++++++++++++++--- 4 files changed, 47 insertions(+), 28 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 2ed56cbbb..5ce5feca4 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -110,8 +110,6 @@ public class HttpAPIClientHelper { private final CloseableHttpClient httpClient; - private final RequestConfig baseRequestConfig; - private String proxyAuthHeaderValue; private final Set defaultRetryCauses; @@ -125,11 +123,6 @@ public HttpAPIClientHelper(Map configuration, Object metricsRegi this.metricsRegistry = metricsRegistry; this.httpClient = createHttpClient(initSslContext, configuration); - RequestConfig.Builder reqConfBuilder = RequestConfig.custom(); - reqConfBuilder.setConnectionRequestTimeout(ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(configuration), TimeUnit.MILLISECONDS); - - this.baseRequestConfig = reqConfBuilder.build(); - boolean usingClientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(configuration); boolean usingServerCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(configuration); boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(configuration); @@ -438,12 +431,19 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); - req.setConfig(baseRequestConfig); + // setting entity. wrapping if compression is enabled req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback), clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig)); HttpClientContext context = HttpClientContext.create(); + Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); + Number connectionReqTimeout = ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(requestConfig); + RequestConfig reqHttpConf = RequestConfig.custom() + .setResponseTimeout(responseTimeout.longValue(), TimeUnit.MILLISECONDS) + .setConnectionRequestTimeout(connectionReqTimeout.longValue(), TimeUnit.MILLISECONDS) + .build(); + context.setRequestConfig(reqHttpConf); ClassicHttpResponse httpResponse = null; try { diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 8d8863ade..74e7a52ab 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -175,12 +175,19 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr LOG.warn("Failed to close response after exception", e); } } + handleSocketTimeoutException(e); onResultSetClosed(null); + throw ExceptionUtils.toSqlState(e); + } + } - if (e instanceof SocketTimeoutException) { + protected void handleSocketTimeoutException(Exception e) { + if (e.getCause() instanceof SocketTimeoutException || e instanceof SocketTimeoutException) { + try { this.connection.onNetworkTimeout(); + } catch (SQLException e1) { + LOG.warn("Failed to handle network timeout exception", e1); } - throw ExceptionUtils.toSqlState(e); } } @@ -215,9 +222,7 @@ protected long executeUpdateImpl(String sql, QuerySettings settings) throws SQLE updateCount = Math.max(0, (int) response.getWrittenRows()); // when statement alters schema no result rows returned. lastQueryId = response.getQueryId(); } catch (Exception e) { - if (e instanceof SocketTimeoutException) { - this.connection.onNetworkTimeout(); - } + handleSocketTimeoutException(e); throw ExceptionUtils.toSqlState(e); } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java index 6e81931d1..582065016 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java @@ -16,7 +16,6 @@ import java.io.InputStream; import java.io.Reader; import java.math.BigDecimal; -import java.net.SocketTimeoutException; import java.net.URL; import java.sql.Array; import java.sql.Blob; @@ -110,9 +109,7 @@ public long executeLargeUpdate() throws SQLException { try { writer.commitRow(); } catch (Exception e) { - if (e instanceof SocketTimeoutException) { - this.connection.onNetworkTimeout(); - } + handleSocketTimeoutException(e); throw new SQLException(e); } @@ -125,9 +122,7 @@ public long executeLargeUpdate() throws SQLException { updateCount = Math.max(0, (int) response.getWrittenRows()); // when statement alters schema no result rows returned. lastQueryId = response.getQueryId(); } catch (Exception e) { - if (e instanceof SocketTimeoutException) { - this.connection.onNetworkTimeout(); - } + handleSocketTimeoutException(e); throw ExceptionUtils.toSqlState(e); } finally { try { @@ -305,9 +300,7 @@ public void addBatch() throws SQLException { try { writer.commitRow(); } catch (Exception e) { - if (e instanceof SocketTimeoutException) { - this.connection.onNetworkTimeout(); - } + handleSocketTimeoutException(e); throw new SQLException(e); } } diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java index 7fcc8fd8b..e8ae2e251 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java @@ -28,6 +28,7 @@ import java.util.Base64; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -396,13 +397,33 @@ public void abortTest() throws SQLException { } } - @Test(groups = { "integration" }) + @Test(groups = {"integration"}) public void testNetworkTimeout() throws SQLException { + try (Connection conn = this.getJdbcConnection()) { + Assert.assertThrows(SQLException.class, () -> conn.setNetworkTimeout(null, 1000)); + Assert.assertThrows(SQLException.class, () -> conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), -1)); + + int timeout = 10; + ExecutorService executorService = Executors.newSingleThreadExecutor(); + conn.setNetworkTimeout(executorService, timeout); + Assert.assertEquals(conn.getNetworkTimeout(), timeout); + Statement stmt = conn.createStatement(); + try { + ResultSet rs = stmt.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 2"); + fail("Exception expected"); + } catch (Exception e) { + Assert.assertTrue(conn.isClosed()); + Assert.assertFalse(conn.isValid(1000)); + conn.close(); + } + + try { + stmt.executeQuery("SELECT 1"); + } catch (SQLException e) { + Assert.assertTrue(e.getMessage().contains("closed")); + } + } try { - Connection conn = this.getJdbcConnection(); - int t1 = (int) TimeUnit.SECONDS.toMillis(20); - conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), t1); - Assert.assertEquals(t1, conn.getNetworkTimeout()); } catch (Exception e) { From c185ad9692a2f73c8a2746e282fbb0327e22b284 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 25 Aug 2025 18:50:01 -0700 Subject: [PATCH 4/6] fixed test to wait task completeness --- .../src/test/java/com/clickhouse/jdbc/ConnectionTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java index e8ae2e251..6bbfc4814 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java @@ -398,7 +398,7 @@ public void abortTest() throws SQLException { } @Test(groups = {"integration"}) - public void testNetworkTimeout() throws SQLException { + public void testNetworkTimeout() throws Exception { try (Connection conn = this.getJdbcConnection()) { Assert.assertThrows(SQLException.class, () -> conn.setNetworkTimeout(null, 1000)); Assert.assertThrows(SQLException.class, () -> conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), -1)); @@ -408,13 +408,15 @@ public void testNetworkTimeout() throws SQLException { conn.setNetworkTimeout(executorService, timeout); Assert.assertEquals(conn.getNetworkTimeout(), timeout); Statement stmt = conn.createStatement(); - try { - ResultSet rs = stmt.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 2"); + try (ResultSet rs = stmt.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 2")) { fail("Exception expected"); } catch (Exception e) { + executorService.shutdown(); + executorService.awaitTermination(20, TimeUnit.SECONDS); Assert.assertTrue(conn.isClosed()); Assert.assertFalse(conn.isValid(1000)); conn.close(); + } try { From 48b092c01b1d45805d7bffacdcd59f0c9730485e Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 25 Aug 2025 22:52:54 -0700 Subject: [PATCH 5/6] optimized code --- .../client/api/insert/InsertSettings.java | 4 +- .../com/clickhouse/client/SettingsTests.java | 41 +++++++++++++++---- .../com/clickhouse/jdbc/ConnectionImpl.java | 3 +- .../com/clickhouse/jdbc/ResultSetImpl.java | 22 ++++------ .../com/clickhouse/jdbc/StatementImpl.java | 7 +--- .../clickhouse/jdbc/internal/JdbcUtils.java | 10 ----- 6 files changed, 48 insertions(+), 39 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java index ef64ac49d..29e24d508 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java @@ -283,7 +283,9 @@ public String getLogComment() { public static InsertSettings merge(InsertSettings source, InsertSettings override) { CommonSettings mergedSettings = source.settings.copyAndMerge(override.settings); - return new InsertSettings(mergedSettings); + InsertSettings insertSettings = new InsertSettings(mergedSettings); + insertSettings.setInputStreamCopyBufferSize(override.getInputStreamCopyBufferSize()); + return insertSettings; } /** diff --git a/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java b/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java index f4331ae6d..bb7dcd8d3 100644 --- a/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java @@ -6,9 +6,11 @@ import org.testng.Assert; import org.testng.annotations.Test; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; @Test(groups = {"unit"}) public class SettingsTests { @@ -22,15 +24,28 @@ void testClientSettings() { } @Test - void testMergeQuerySettings() { - QuerySettings settings1 = new QuerySettings().setQueryId("test1").httpHeader("key1", "value1"); - QuerySettings settings2 = new QuerySettings().httpHeader("key1", "value2"); + void testMergeSettings() { + { + QuerySettings settings1 = new QuerySettings().setQueryId("test1").httpHeader("key1", "value1"); + QuerySettings settings2 = new QuerySettings().httpHeader("key1", "value2"); + + QuerySettings merged = QuerySettings.merge(settings1, settings2); + Assert.assertNotSame(merged, settings1); + Assert.assertNotSame(merged, settings2); - QuerySettings merged = QuerySettings.merge(settings1, settings2); - Assert.assertNotSame(merged, settings1); - Assert.assertNotSame(merged, settings2); + Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2"); + } + { + InsertSettings settings1 = new InsertSettings().setQueryId("test1").httpHeader("key1", "value1"); + InsertSettings settings2 = new InsertSettings().httpHeader("key1", "value2").setInputStreamCopyBufferSize(200000); - Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2"); + InsertSettings merged = InsertSettings.merge(settings1, settings2); + Assert.assertNotSame(merged, settings1); + Assert.assertNotSame(merged, settings2); + + Assert.assertEquals(merged.getInputStreamCopyBufferSize(), settings2.getInputStreamCopyBufferSize()); + Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2"); + } } @Test @@ -87,6 +102,12 @@ void testQuerySettingsSpecific() throws Exception { settings.logComment(null); Assert.assertNull(settings.getLogComment()); } + + { + final QuerySettings settings = new QuerySettings(); + settings.setNetworkTimeout(10, ChronoUnit.SECONDS); + Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10)); + } } @Test @@ -140,5 +161,11 @@ public void testInsertSettingsSpecific() throws Exception { settings.logComment(null); Assert.assertNull(settings.getLogComment()); } + + { + final InsertSettings settings = new InsertSettings(); + settings.setNetworkTimeout(10, ChronoUnit.SECONDS); + Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10)); + } } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java index c4b26cdde..5ac22198c 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java @@ -39,6 +39,7 @@ import java.sql.Statement; import java.sql.Struct; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Calendar; import java.util.Collections; @@ -637,7 +638,7 @@ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLExc // Some connection pools set timeout before calling Connection#close() to ensure that this operation will not hang // Socket timeout is propagated with QuerySettings this connection has. networkTimeoutExecutor = executor; - defaultQuerySettings.setOption(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), (long)milliseconds); + defaultQuerySettings.setNetworkTimeout(milliseconds, ChronoUnit.MILLIS); } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java index 548d08ddb..d7bf05525 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java @@ -37,6 +37,7 @@ import java.time.ZonedDateTime; import java.util.Calendar; import java.util.Map; +import java.util.function.Consumer; public class ResultSetImpl implements ResultSet, JdbcV2Wrapper { private static final Logger log = LoggerFactory.getLogger(ResultSetImpl.class); @@ -57,7 +58,10 @@ public class ResultSetImpl implements ResultSet, JdbcV2Wrapper { private int fetchSize; - public ResultSetImpl(StatementImpl parentStatement, QueryResponse response, ClickHouseBinaryFormatReader reader) throws SQLException { + private Consumer onDataTransferException; + + public ResultSetImpl(StatementImpl parentStatement, QueryResponse response, ClickHouseBinaryFormatReader reader, + Consumer onDataTransferException) throws SQLException { this.parentStatement = parentStatement; this.response = response; this.reader = reader; @@ -73,17 +77,7 @@ public ResultSetImpl(StatementImpl parentStatement, QueryResponse response, Clic this.defaultCalendar = parentStatement.getConnection().defaultCalendar; this.rowPos = BEFORE_FIRST; this.fetchSize = parentStatement.getFetchSize(); - } - - protected ResultSetImpl(ResultSetImpl resultSet) throws SQLException{ - this.parentStatement = resultSet.parentStatement; - this.response = resultSet.response; - this.reader = resultSet.reader; - this.metaData = resultSet.metaData; - this.closed = false; - this.wasNull = false; - this.defaultCalendar = parentStatement.getConnection().defaultCalendar; - this.featureManager = new FeatureManager(parentStatement.getConnection().getJdbcConfig()); + this.onDataTransferException = onDataTransferException; } private void checkClosed() throws SQLException { @@ -118,8 +112,8 @@ public boolean next() throws SQLException { } return readerRow != null; } catch (Exception e) { - if (e instanceof SocketTimeoutException) { - this.parentStatement.onNetworkTimeout(); + if (onDataTransferException != null) { + onDataTransferException.accept(e); } throw ExceptionUtils.toSqlState(e); } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 74e7a52ab..42bd49521 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -166,7 +166,7 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr reader.close(); throw new SQLException("Called method expects empty or filled result set but query has returned none. Consider using `java.sql.Statement.execute(java.lang.String)`", ExceptionUtils.SQL_STATE_CLIENT_ERROR); } - return new ResultSetImpl(this, response, reader); + return new ResultSetImpl(this, response, reader, this::handleSocketTimeoutException); } catch (Exception e) { if (response != null) { try { @@ -635,9 +635,4 @@ public long executeLargeUpdate(String sql, String[] columnNames) throws SQLExcep public String getLastQueryId() { return lastQueryId; } - - // Proxy method for child objects. Do not call. - public void onNetworkTimeout() throws SQLException { - this.connection.onNetworkTimeout(); - } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java index d9d780395..3a86591ea 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java @@ -298,14 +298,4 @@ public static List convertList(List values, Class type) throw } return convertedValues; } - - public static void safeClose(AutoCloseable closeable, Logger logger) { - if (closeable != null) { - try { - closeable.close(); - } catch (Exception ex) { - logger.warn("Failed to close closeable after exception", ex); - } - } - } } From 2c7976b42c20cfe17f444e371232434468d09723 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 25 Aug 2025 23:50:45 -0700 Subject: [PATCH 6/6] improved code and added one more test --- .../com/clickhouse/jdbc/ConnectionImpl.java | 12 +++------ .../com/clickhouse/jdbc/StatementImpl.java | 6 +---- .../com/clickhouse/jdbc/ConnectionTest.java | 27 +++++++++++++++++-- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java index 5ac22198c..69cbc9a7f 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java @@ -2,15 +2,11 @@ import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientConfigProperties; -import com.clickhouse.client.api.ClientException; -import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader; import com.clickhouse.client.api.internal.ServerSettings; import com.clickhouse.client.api.metadata.TableSchema; import com.clickhouse.client.api.query.GenericRecord; -import com.clickhouse.client.api.query.QueryResponse; import com.clickhouse.client.api.query.QuerySettings; import com.clickhouse.data.ClickHouseDataType; -import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.jdbc.internal.ExceptionUtils; import com.clickhouse.jdbc.internal.JdbcConfiguration; import com.clickhouse.jdbc.internal.JdbcUtils; @@ -20,7 +16,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; @@ -49,7 +44,6 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class ConnectionImpl implements Connection, JdbcV2Wrapper { @@ -643,9 +637,9 @@ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLExc // Should be called by child object to notify about timeout. - public void onNetworkTimeout() throws SQLException { - if (isClosed() || networkTimeoutExecutor == null) { - return; // we closed already so do nothing. + public synchronized void onNetworkTimeout() { + if (this.closed || networkTimeoutExecutor == null) { + return; // we closed already or have not set network timeout so do nothing. } networkTimeoutExecutor.execute(() -> { diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 42bd49521..4f167f997 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -183,11 +183,7 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr protected void handleSocketTimeoutException(Exception e) { if (e.getCause() instanceof SocketTimeoutException || e instanceof SocketTimeoutException) { - try { - this.connection.onNetworkTimeout(); - } catch (SQLException e1) { - LOG.warn("Failed to handle network timeout exception", e1); - } + this.connection.onNetworkTimeout(); } } diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java index 6bbfc4814..a742c811a 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static org.testng.Assert.assertEquals; @@ -425,10 +426,32 @@ public void testNetworkTimeout() throws Exception { Assert.assertTrue(e.getMessage().contains("closed")); } } - try { - } catch (Exception e) { + Properties connConfig = new Properties(); + connConfig.setProperty(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), "10"); + try (Connection conn = getJdbcConnection(connConfig)) { + Statement stmt = conn.createStatement(); + try (ResultSet rs = stmt.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 2")) { + fail("Exception expected"); + } catch (Exception e) { + Assert.assertFalse(conn.isClosed()); + Assert.assertTrue(conn.isValid(1000)); + } + } + try (Connection conn = getJdbcConnection(connConfig)) { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + conn.setNetworkTimeout(executorService, 10); + try (Statement stmt1 = conn.createStatement(); Statement stmt2 = conn.createStatement()) { + ScheduledExecutorService stmtExecutor = Executors.newScheduledThreadPool(2); + long t1 = System.currentTimeMillis(); + stmtExecutor.schedule(() -> stmt1.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 2"), 100, TimeUnit.MILLISECONDS); + long t2 = System.currentTimeMillis() - t1; + stmtExecutor.schedule(() -> stmt2.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 1"), 100 - t2, TimeUnit.MILLISECONDS); + + stmtExecutor.shutdown(); + stmtExecutor.awaitTermination(10, TimeUnit.SECONDS); + } } }