diff --git a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java index 7cab9bb8c..29e24d508 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java @@ -5,6 +5,7 @@ import com.clickhouse.client.api.internal.CommonSettings; import org.apache.hc.core5.http.HttpHeaders; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.Map; @@ -27,6 +28,11 @@ public InsertSettings(Map settings) { } } + private InsertSettings(CommonSettings settings) { + this.settings = settings; + setDefaults(); + } + private void setDefaults() {// Default settings, for now a very small list this.setInputStreamCopyBufferSize(DEFAULT_INPUT_STREAM_BATCH_SIZE); } @@ -274,4 +280,28 @@ public InsertSettings logComment(String logComment) { public String getLogComment() { return settings.getLogComment(); } + + public static InsertSettings merge(InsertSettings source, InsertSettings override) { + CommonSettings mergedSettings = source.settings.copyAndMerge(override.settings); + InsertSettings insertSettings = new InsertSettings(mergedSettings); + insertSettings.setInputStreamCopyBufferSize(override.getInputStreamCopyBufferSize()); + return insertSettings; + } + + /** + * Sets a network operation timeout. + * @param timeout + * @param unit + */ + public void setNetworkTimeout(long timeout, ChronoUnit unit) { + settings.setNetworkTimeout(timeout, unit); + } + + /** + * Returns network timeout. Zero value is returned if no timeout is set. + * @return timeout in ms. + */ + public Long getNetworkTimeout() { + return settings.getNetworkTimeout(); + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java index d703570b7..72014f632 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java @@ -3,6 +3,8 @@ import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientConfigProperties; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -217,6 +219,25 @@ public String getLogComment() { return logComment; } + /** + * Sets a network operation timeout. + * @param timeout + * @param unit + */ + public void setNetworkTimeout(long timeout, ChronoUnit unit) { + settings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), Duration.of(timeout, unit).toMillis()); + } + + /** + * Returns network timeout. Zero value is returned if no timeout is set. + * @return timeout in ms. + */ + public Long getNetworkTimeout() { + return (Long) getOption(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), + ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getDefaultValue()); + } + + public CommonSettings copyAndMerge(CommonSettings override) { CommonSettings copy = new CommonSettings(); copy.settings.putAll(settings); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 2ed56cbbb..5ce5feca4 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -110,8 +110,6 @@ public class HttpAPIClientHelper { private final CloseableHttpClient httpClient; - private final RequestConfig baseRequestConfig; - private String proxyAuthHeaderValue; private final Set defaultRetryCauses; @@ -125,11 +123,6 @@ public HttpAPIClientHelper(Map configuration, Object metricsRegi this.metricsRegistry = metricsRegistry; this.httpClient = createHttpClient(initSslContext, configuration); - RequestConfig.Builder reqConfBuilder = RequestConfig.custom(); - reqConfBuilder.setConnectionRequestTimeout(ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(configuration), TimeUnit.MILLISECONDS); - - this.baseRequestConfig = reqConfBuilder.build(); - boolean usingClientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(configuration); boolean usingServerCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(configuration); boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(configuration); @@ -438,12 +431,19 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); - req.setConfig(baseRequestConfig); + // setting entity. wrapping if compression is enabled req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback), clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig)); HttpClientContext context = HttpClientContext.create(); + Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); + Number connectionReqTimeout = ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(requestConfig); + RequestConfig reqHttpConf = RequestConfig.custom() + .setResponseTimeout(responseTimeout.longValue(), TimeUnit.MILLISECONDS) + .setConnectionRequestTimeout(connectionReqTimeout.longValue(), TimeUnit.MILLISECONDS) + .build(); + context.setRequestConfig(reqHttpConf); ClassicHttpResponse httpResponse = null; try { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java b/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java index e1b8cbce9..1ade90c3d 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java @@ -8,8 +8,8 @@ import com.clickhouse.client.api.internal.ValidationUtils; import com.clickhouse.data.ClickHouseFormat; +import java.time.temporal.ChronoUnit; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.TimeZone; @@ -269,6 +269,23 @@ public String getLogComment() { return settings.getLogComment(); } + /** + * Sets a network operation timeout. + * @param timeout + * @param unit + */ + public void setNetworkTimeout(long timeout, ChronoUnit unit) { + settings.setNetworkTimeout(timeout, unit); + } + + /** + * Returns network timeout. Zero value is returned if no timeout is set. + * @return timeout in ms. + */ + public Long getNetworkTimeout() { + return settings.getNetworkTimeout(); + } + public static QuerySettings merge(QuerySettings source, QuerySettings override) { CommonSettings mergedSettings = source.settings.copyAndMerge(override.settings); return new QuerySettings(mergedSettings); diff --git a/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java b/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java index f4331ae6d..bb7dcd8d3 100644 --- a/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java @@ -6,9 +6,11 @@ import org.testng.Assert; import org.testng.annotations.Test; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; @Test(groups = {"unit"}) public class SettingsTests { @@ -22,15 +24,28 @@ void testClientSettings() { } @Test - void testMergeQuerySettings() { - QuerySettings settings1 = new QuerySettings().setQueryId("test1").httpHeader("key1", "value1"); - QuerySettings settings2 = new QuerySettings().httpHeader("key1", "value2"); + void testMergeSettings() { + { + QuerySettings settings1 = new QuerySettings().setQueryId("test1").httpHeader("key1", "value1"); + QuerySettings settings2 = new QuerySettings().httpHeader("key1", "value2"); + + QuerySettings merged = QuerySettings.merge(settings1, settings2); + Assert.assertNotSame(merged, settings1); + Assert.assertNotSame(merged, settings2); - QuerySettings merged = QuerySettings.merge(settings1, settings2); - Assert.assertNotSame(merged, settings1); - Assert.assertNotSame(merged, settings2); + Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2"); + } + { + InsertSettings settings1 = new InsertSettings().setQueryId("test1").httpHeader("key1", "value1"); + InsertSettings settings2 = new InsertSettings().httpHeader("key1", "value2").setInputStreamCopyBufferSize(200000); - Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2"); + InsertSettings merged = InsertSettings.merge(settings1, settings2); + Assert.assertNotSame(merged, settings1); + Assert.assertNotSame(merged, settings2); + + Assert.assertEquals(merged.getInputStreamCopyBufferSize(), settings2.getInputStreamCopyBufferSize()); + Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2"); + } } @Test @@ -87,6 +102,12 @@ void testQuerySettingsSpecific() throws Exception { settings.logComment(null); Assert.assertNull(settings.getLogComment()); } + + { + final QuerySettings settings = new QuerySettings(); + settings.setNetworkTimeout(10, ChronoUnit.SECONDS); + Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10)); + } } @Test @@ -140,5 +161,11 @@ public void testInsertSettingsSpecific() throws Exception { settings.logComment(null); Assert.assertNull(settings.getLogComment()); } + + { + final InsertSettings settings = new InsertSettings(); + settings.setNetworkTimeout(10, ChronoUnit.SECONDS); + Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10)); + } } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java index a38515944..69cbc9a7f 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java @@ -34,6 +34,7 @@ import java.sql.Statement; import java.sql.Struct; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Calendar; import java.util.Collections; @@ -46,7 +47,7 @@ import java.util.stream.Collectors; public class ConnectionImpl implements Connection, JdbcV2Wrapper { - private static final Logger log = LoggerFactory.getLogger(ConnectionImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(ConnectionImpl.class); protected final String url; private final Client client; // this member is private to force using getClient() @@ -65,6 +66,8 @@ public class ConnectionImpl implements Connection, JdbcV2Wrapper { private final SqlParser sqlParser; + private Executor networkTimeoutExecutor; + public ConnectionImpl(String url, Properties info) throws SQLException { try { this.url = url;//Raw URL @@ -85,10 +88,10 @@ public ConnectionImpl(String url, Properties info) throws SQLException { } if (this.config.isDisableFrameworkDetection()) { - log.debug("Framework detection is disabled."); + LOG.debug("Framework detection is disabled."); } else { String detectedFrameworks = Driver.FrameworksDetection.getFrameworksDetected(); - log.debug("Detected frameworks: {}", detectedFrameworks); + LOG.debug("Detected frameworks: {}", detectedFrameworks); if (!detectedFrameworks.trim().isEmpty()) { clientName += " (" + detectedFrameworks + ")"; } @@ -213,9 +216,8 @@ public void close() throws SQLException { if (isClosed()) { return; } - - client.close(); - closed = true; + closed = true; // mark as closed to prevent further invocations + client.close(); // this will disrupt pending requests. } @Override @@ -600,27 +602,58 @@ public String getSchema() throws SQLException { @Override public void abort(Executor executor) throws SQLException { - if (!config.isIgnoreUnsupportedRequests()) { - throw new SQLFeatureNotSupportedException("abort not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED); + if (executor == null) { + throw new SQLException("Executor must be not null"); } + // This method should check permissions with SecurityManager but the one is deprecated. + // There is no replacement for SecurityManger and it is marked for removal. + this.close(); } @Override public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { - //TODO: Should this be supported? - if (!config.isIgnoreUnsupportedRequests()) { - throw new SQLFeatureNotSupportedException("setNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED); + ensureOpen(); + + // Very good mail thread about this method implementation. https://mail.openjdk.org/pipermail/jdbc-spec-discuss/2017-November/000236.html + + // This method should check permissions with SecurityManager but the one is deprecated. + // There is no replacement for SecurityManger and it is marked for removal. + if (milliseconds > 0 && executor == null) { + // we need executor only for positive timeout values. + throw new SQLException("Executor must be not null"); + } + if (milliseconds < 0) { + throw new SQLException("Timeout must be >= 0"); + } + + // How it should work: + // if timeout is set with this method then any timeout exception should be reported to the connection + // when connection get signal about timeout it uses executor to abort itself + // Some connection pools set timeout before calling Connection#close() to ensure that this operation will not hang + // Socket timeout is propagated with QuerySettings this connection has. + networkTimeoutExecutor = executor; + defaultQuerySettings.setNetworkTimeout(milliseconds, ChronoUnit.MILLIS); + } + + + // Should be called by child object to notify about timeout. + public synchronized void onNetworkTimeout() { + if (this.closed || networkTimeoutExecutor == null) { + return; // we closed already or have not set network timeout so do nothing. } + + networkTimeoutExecutor.execute(() -> { + try { + this.abort(networkTimeoutExecutor); + } catch (SQLException e) { + throw new RuntimeException("Failed to abort connection", e); + } + }); } @Override public int getNetworkTimeout() throws SQLException { - //TODO: Should this be supported? - if (!config.isIgnoreUnsupportedRequests()) { - throw new SQLFeatureNotSupportedException("getNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED); - } - - return -1; + return defaultQuerySettings.getNetworkTimeout().intValue(); } /** diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java index 4bcc48d4c..d7bf05525 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java @@ -16,6 +16,7 @@ import java.io.Reader; import java.io.StringReader; import java.math.BigDecimal; +import java.net.SocketTimeoutException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.sql.Blob; @@ -36,6 +37,7 @@ import java.time.ZonedDateTime; import java.util.Calendar; import java.util.Map; +import java.util.function.Consumer; public class ResultSetImpl implements ResultSet, JdbcV2Wrapper { private static final Logger log = LoggerFactory.getLogger(ResultSetImpl.class); @@ -56,7 +58,10 @@ public class ResultSetImpl implements ResultSet, JdbcV2Wrapper { private int fetchSize; - public ResultSetImpl(StatementImpl parentStatement, QueryResponse response, ClickHouseBinaryFormatReader reader) throws SQLException { + private Consumer onDataTransferException; + + public ResultSetImpl(StatementImpl parentStatement, QueryResponse response, ClickHouseBinaryFormatReader reader, + Consumer onDataTransferException) throws SQLException { this.parentStatement = parentStatement; this.response = response; this.reader = reader; @@ -72,17 +77,7 @@ public ResultSetImpl(StatementImpl parentStatement, QueryResponse response, Clic this.defaultCalendar = parentStatement.getConnection().defaultCalendar; this.rowPos = BEFORE_FIRST; this.fetchSize = parentStatement.getFetchSize(); - } - - protected ResultSetImpl(ResultSetImpl resultSet) throws SQLException{ - this.parentStatement = resultSet.parentStatement; - this.response = resultSet.response; - this.reader = resultSet.reader; - this.metaData = resultSet.metaData; - this.closed = false; - this.wasNull = false; - this.defaultCalendar = parentStatement.getConnection().defaultCalendar; - this.featureManager = new FeatureManager(parentStatement.getConnection().getJdbcConfig()); + this.onDataTransferException = onDataTransferException; } private void checkClosed() throws SQLException { @@ -117,6 +112,9 @@ public boolean next() throws SQLException { } return readerRow != null; } catch (Exception e) { + if (onDataTransferException != null) { + onDataTransferException.accept(e); + } throw ExceptionUtils.toSqlState(e); } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 5f7cc70f5..4f167f997 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.SocketTimeoutException; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; @@ -165,7 +166,7 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr reader.close(); throw new SQLException("Called method expects empty or filled result set but query has returned none. Consider using `java.sql.Statement.execute(java.lang.String)`", ExceptionUtils.SQL_STATE_CLIENT_ERROR); } - return new ResultSetImpl(this, response, reader); + return new ResultSetImpl(this, response, reader, this::handleSocketTimeoutException); } catch (Exception e) { if (response != null) { try { @@ -174,11 +175,18 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr LOG.warn("Failed to close response after exception", e); } } + handleSocketTimeoutException(e); onResultSetClosed(null); throw ExceptionUtils.toSqlState(e); } } + protected void handleSocketTimeoutException(Exception e) { + if (e.getCause() instanceof SocketTimeoutException || e instanceof SocketTimeoutException) { + this.connection.onNetworkTimeout(); + } + } + @Override public int executeUpdate(String sql) throws SQLException { ensureOpen(); @@ -210,6 +218,7 @@ protected long executeUpdateImpl(String sql, QuerySettings settings) throws SQLE updateCount = Math.max(0, (int) response.getWrittenRows()); // when statement alters schema no result rows returned. lastQueryId = response.getQueryId(); } catch (Exception e) { + handleSocketTimeoutException(e); throw ExceptionUtils.toSqlState(e); } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java index 833a7f352..582065016 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java @@ -109,6 +109,7 @@ public long executeLargeUpdate() throws SQLException { try { writer.commitRow(); } catch (Exception e) { + handleSocketTimeoutException(e); throw new SQLException(e); } @@ -121,6 +122,7 @@ public long executeLargeUpdate() throws SQLException { updateCount = Math.max(0, (int) response.getWrittenRows()); // when statement alters schema no result rows returned. lastQueryId = response.getQueryId(); } catch (Exception e) { + handleSocketTimeoutException(e); throw ExceptionUtils.toSqlState(e); } finally { try { @@ -298,6 +300,7 @@ public void addBatch() throws SQLException { try { writer.commitRow(); } catch (Exception e) { + handleSocketTimeoutException(e); throw new SQLException(e); } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java index 9752d98ab..3a86591ea 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java @@ -6,6 +6,7 @@ import com.clickhouse.data.Tuple; import com.clickhouse.jdbc.types.Array; import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; import java.awt.*; import java.math.BigInteger; diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java index 42374f757..a742c811a 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/ConnectionTest.java @@ -28,10 +28,15 @@ import java.util.Base64; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; public class ConnectionTest extends JdbcIntegrationTest { @@ -387,20 +392,67 @@ public void setSchemaTest() throws SQLException { @Test(groups = { "integration" }) public void abortTest() throws SQLException { - Connection localConnection = this.getJdbcConnection(); - assertThrows(SQLFeatureNotSupportedException.class, () -> localConnection.abort(null)); + try (Connection conn = this.getJdbcConnection()) { + conn.abort(Executors.newSingleThreadExecutor()); + assertTrue(conn.isClosed()); + } } - @Test(groups = { "integration" }) - public void setNetworkTimeoutTest() throws SQLException { - Connection localConnection = this.getJdbcConnection(); - assertThrows(SQLFeatureNotSupportedException.class, () -> localConnection.setNetworkTimeout(null, 0)); - } + @Test(groups = {"integration"}) + public void testNetworkTimeout() throws Exception { + try (Connection conn = this.getJdbcConnection()) { + Assert.assertThrows(SQLException.class, () -> conn.setNetworkTimeout(null, 1000)); + Assert.assertThrows(SQLException.class, () -> conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), -1)); + + int timeout = 10; + ExecutorService executorService = Executors.newSingleThreadExecutor(); + conn.setNetworkTimeout(executorService, timeout); + Assert.assertEquals(conn.getNetworkTimeout(), timeout); + Statement stmt = conn.createStatement(); + try (ResultSet rs = stmt.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 2")) { + fail("Exception expected"); + } catch (Exception e) { + executorService.shutdown(); + executorService.awaitTermination(20, TimeUnit.SECONDS); + Assert.assertTrue(conn.isClosed()); + Assert.assertFalse(conn.isValid(1000)); + conn.close(); - @Test(groups = { "integration" }) - public void getNetworkTimeoutTest() throws SQLException { - Connection localConnection = this.getJdbcConnection(); - assertThrows(SQLFeatureNotSupportedException.class, localConnection::getNetworkTimeout); + } + + try { + stmt.executeQuery("SELECT 1"); + } catch (SQLException e) { + Assert.assertTrue(e.getMessage().contains("closed")); + } + } + + Properties connConfig = new Properties(); + connConfig.setProperty(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), "10"); + try (Connection conn = getJdbcConnection(connConfig)) { + Statement stmt = conn.createStatement(); + try (ResultSet rs = stmt.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 2")) { + fail("Exception expected"); + } catch (Exception e) { + Assert.assertFalse(conn.isClosed()); + Assert.assertTrue(conn.isValid(1000)); + } + } + + try (Connection conn = getJdbcConnection(connConfig)) { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + conn.setNetworkTimeout(executorService, 10); + try (Statement stmt1 = conn.createStatement(); Statement stmt2 = conn.createStatement()) { + ScheduledExecutorService stmtExecutor = Executors.newScheduledThreadPool(2); + long t1 = System.currentTimeMillis(); + stmtExecutor.schedule(() -> stmt1.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 2"), 100, TimeUnit.MILLISECONDS); + long t2 = System.currentTimeMillis() - t1; + stmtExecutor.schedule(() -> stmt2.executeQuery("SELECT sleepEachRow(1) FROM system.numbers LIMIT 1"), 100 - t2, TimeUnit.MILLISECONDS); + + stmtExecutor.shutdown(); + stmtExecutor.awaitTermination(10, TimeUnit.SECONDS); + } + } } @Test(groups = { "integration" })