diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 06f7935c6..e468e5812 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -39,7 +39,6 @@ import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.data.ClickHouseColumn; import com.clickhouse.data.ClickHouseFormat; -import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.http.ClassicHttpResponse; @@ -185,6 +184,8 @@ private Client(Set endpoints, Map configuration, boolean } else { this.lz4Factory = LZ4Factory.fastestJavaInstance(); } + + this.serverVersion = configuration.getOrDefault(ClientConfigProperties.SERVER_VERSION.getKey(), "unknown"); } /** @@ -192,24 +193,16 @@ private Client(Set endpoints, Map configuration, boolean * */ public void loadServerInfo() { - // only if 2 properties are set disable retrieval from server - if (!this.configuration.containsKey(ClientConfigProperties.SERVER_TIMEZONE.getKey()) && !this.configuration.containsKey(ClientConfigProperties.SERVER_VERSION.getKey())) { - try (QueryResponse response = this.query("SELECT currentUser() AS user, timezone() AS timezone, version() AS version LIMIT 1").get()) { - try (ClickHouseBinaryFormatReader reader = this.newBinaryFormatReader(response)) { - if (reader.next() != null) { - this.configuration.put(ClientConfigProperties.USER.getKey(), reader.getString("user")); - this.configuration.put(ClientConfigProperties.SERVER_TIMEZONE.getKey(), reader.getString("timezone")); - serverVersion = reader.getString("version"); - } + try (QueryResponse response = this.query("SELECT currentUser() AS user, timezone() AS timezone, version() AS version LIMIT 1").get()) { + try (ClickHouseBinaryFormatReader reader = this.newBinaryFormatReader(response)) { + if (reader.next() != null) { + this.configuration.put(ClientConfigProperties.USER.getKey(), reader.getString("user")); + this.configuration.put(ClientConfigProperties.SERVER_TIMEZONE.getKey(), reader.getString("timezone")); + serverVersion = reader.getString("version"); } - } catch (Exception e) { - throw new ClientException("Failed to get server info", e); - } - } else { - LOG.info("Using server version " + this.configuration.get(ClientConfigProperties.SERVER_VERSION.getKey()) + " and timezone " + this.configuration.get(ClientConfigProperties.SERVER_TIMEZONE.getKey()) ); - if (this.configuration.containsKey(ClientConfigProperties.SERVER_VERSION.getKey())) { - serverVersion = this.configuration.get(ClientConfigProperties.SERVER_VERSION.getKey()); } + } catch (Exception e) { + throw new ClientException("Failed to get server info", e); } } @@ -1213,8 +1206,11 @@ public boolean ping() { */ public boolean ping(long timeout) { long startTime = System.nanoTime(); - try (QueryResponse response = query("SELECT 1 FORMAT TabSeparated").get(timeout, TimeUnit.MILLISECONDS)) { - return true; + try { + CompletableFuture future = query("SELECT 1 FORMAT TabSeparated"); + try (QueryResponse response = timeout > 0 ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get()) { + return true; + } } catch (Exception e) { LOG.debug("Failed to connect to the server (Duration: {})", System.nanoTime() - startTime, e); return false; @@ -2153,7 +2149,11 @@ private void applyDefaults(QuerySettings settings) { private CompletableFuture runAsyncOperation(Supplier resultSupplier, Map requestSettings) { boolean isAsync = MapUtils.getFlag(requestSettings, configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey()); - return isAsync ? CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor) : CompletableFuture.completedFuture(resultSupplier.get()); + if (isAsync) { + return sharedOperationExecutor == null ? CompletableFuture.supplyAsync(resultSupplier) : + CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor); + } + return CompletableFuture.completedFuture(resultSupplier.get()); } @Override diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java index 9f0aa0510..13fd0026f 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java @@ -27,7 +27,7 @@ public class ClientTests extends BaseIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(ClientTests.class); - @Test(dataProvider = "clientProvider") + @Test(groups = {"integration"}, dataProvider = "secureClientProvider") public void testAddSecureEndpoint(Client client) { if (isCloud()) { return; // will fail in other tests @@ -52,31 +52,35 @@ public void testAddSecureEndpoint(Client client) { } } - @DataProvider(name = "clientProvider") - private static Client[] secureClientProvider() throws Exception { + @DataProvider + public static Object[][] secureClientProvider() throws Exception { ClickHouseNode node = ClickHouseServerForTest.getClickHouseNode(ClickHouseProtocol.HTTP, true, ClickHouseNode.builder() .addOption(ClickHouseClientOption.SSL_MODE.getKey(), "none") .addOption(ClickHouseClientOption.SSL.getKey(), "true").build()); - return new Client[]{ - new Client.Builder() - .addEndpoint("https://" + node.getHost() + ":" + node.getPort()) - .setUsername("default") - .setPassword("") - .setRootCertificate("containers/clickhouse-server/certs/localhost.crt") - .build(), - new Client.Builder() - .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), true) - .setUsername("default") - .setPassword("") - .setRootCertificate("containers/clickhouse-server/certs/localhost.crt") - .setClientKey("user.key") - .setClientCertificate("user.crt") - .build() + return new Client[][]{ + { + new Client.Builder() + .addEndpoint("https://" + node.getHost() + ":" + node.getPort()) + .setUsername("default") + .setPassword("") + .setRootCertificate("containers/clickhouse-server/certs/localhost.crt") + .build() + }, + { + new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), true) + .setUsername("default") + .setPassword("") + .setRootCertificate("containers/clickhouse-server/certs/localhost.crt") + .setClientKey("user.key") + .setClientCertificate("user.crt") + .build() + } }; } - @Test + @Test(groups = {"integration"}) public void testRawSettings() { Client client = newClient() .setOption("custom_setting_1", "value_1") @@ -102,33 +106,39 @@ public void testRawSettings() { } } - @Test + @Test(groups = {"integration"}) public void testPing() { try (Client client = newClient().build()) { Assert.assertTrue(client.ping()); } } - @Test + @Test(groups = {"integration"}) public void testPingUnpooled() { try (Client client = newClient().enableConnectionPool(false).build()) { Assert.assertTrue(client.ping()); } } - @Test + @Test(groups = {"integration"}) public void testPingFailure() { try (Client client = new Client.Builder() .addEndpoint("http://localhost:12345") .setUsername("default") .setPassword("") - .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) .build()) { Assert.assertFalse(client.ping(TimeUnit.SECONDS.toMillis(20))); } } - @Test + @Test(groups = {"integration"}) + public void testPingAsync() { + try (Client client = newClient().useAsyncRequests(true).build()) { + Assert.assertTrue(client.ping()); + } + } + + @Test(groups = {"integration"}) public void testSetOptions() { Map options = new HashMap<>(); String productName = "my product_name (version 1.0)"; @@ -140,7 +150,7 @@ public void testSetOptions() { } } - @Test + @Test(groups = {"integration"}) public void testProvidedExecutor() throws Exception { ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -159,19 +169,19 @@ public void testProvidedExecutor() throws Exception { Assert.assertFalse(flag.get()); } - @Test + @Test(groups = {"integration"}) public void testLoadingServerContext() throws Exception { long start = System.nanoTime(); try (Client client = newClient().build()) { long initTime = (System.nanoTime() - start) / 1_000_000; Assert.assertTrue(initTime < 100); - Assert.assertNull(client.getServerVersion()); + Assert.assertEquals(client.getServerVersion(), "unknown"); client.loadServerInfo(); Assert.assertNotNull(client.getServerVersion()); } } - @Test + @Test(groups = {"integration"}) public void testDisableNative() { try (Client client = newClient().disableNativeCompression(true).build()) { Assert.assertTrue(client.toString().indexOf("JavaUnsafe") != -1); @@ -185,7 +195,6 @@ protected Client.Builder newClient() { .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) .setUsername("default") .setPassword(ClickHouseServerForTest.getPassword()) - .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) - .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true")); + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()); } } diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 0de89ef15..1a31d325e 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -203,7 +203,7 @@ public void testConnectionRequestTimeout() { } } - @Test + @Test(groups = {"integration"}) public void testConnectionReuseStrategy() { if (isCloud()) { return; // mocked server diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java index e4aac9888..d3f0f674e 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java @@ -5,6 +5,7 @@ import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.ClickHouseServerForTest; import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.ClientException; import com.clickhouse.client.api.DataTypeUtils; import com.clickhouse.client.api.command.CommandResponse; @@ -26,6 +27,7 @@ import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.data.ClickHouseVersion; import com.clickhouse.data.format.BinaryStreamUtils; +import lombok.Data; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4SafeDecompressor; @@ -269,6 +271,43 @@ public void insertRawData() throws Exception { assertEquals(records.size(), 1000); } + @Test(groups = { "integration" }, dataProvider = "insertRawDataAsyncProvider", dataProviderClass = InsertTests.class) + public void insertRawDataAsync(boolean async) throws Exception { + final String tableName = "raw_data_table_async"; + final String createSQL = "CREATE TABLE " + tableName + + " (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()"; + + initTable(tableName, createSQL); + + InsertSettings localSettings = new InsertSettings(settings.getAllSettings()); + localSettings.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), async); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + PrintWriter writer = new PrintWriter(data); + for (int i = 0; i < 1000; i++) { + writer.printf("%d\t%s\t%s\t%d\t%s\n", i, "2021-01-01 00:00:00", "name" + i, i, "p2"); + } + writer.flush(); + client.insert(tableName, new ByteArrayInputStream(data.toByteArray()), + ClickHouseFormat.TSV, localSettings).whenComplete((response, throwable) -> { + OperationMetrics metrics = response.getMetrics(); + assertEquals((int)response.getWrittenRows(), 1000 ); + + List records = client.queryAll("SELECT * FROM " + tableName); + assertEquals(records.size(), 1000); + assertTrue(Thread.currentThread().getName() + .startsWith(async ? "ForkJoinPool.commonPool" : "main"), "Threads starts with " + Thread.currentThread().getName()); + }) + .join(); // wait operation complete. only for tests + } + + @DataProvider + public static Object[][] insertRawDataAsyncProvider(){ + return new Object[][] { + {true}, // async + {false} // blocking + }; + } + @Test(groups = { "integration" }, dataProvider = "insertRawDataSimpleDataProvider", dataProviderClass = InsertTests.class) public void insertRawDataSimple(String tableName) throws Exception { // final String tableName = "raw_data_table"; @@ -639,10 +678,9 @@ public void testCollectionInsert() throws Exception { } } - - static { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); - } +// static { +// System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); +// } @Test(groups = {"integration"}, dataProvider = "testAppCompressionDataProvider", dataProviderClass = InsertTests.class) public void testAppCompression(String algo) throws Exception { diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java index def76f034..86dd1a5bf 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java @@ -95,7 +95,11 @@ public ConnectionImpl(String url, Properties info) throws SQLException { this.client = this.config.applyClientProperties(new Client.Builder()) .setClientName(clientName) .build(); - this.client.loadServerInfo(); + String serverTimezone = this.client.getServerTimeZone(); + if (serverTimezone == null) { + // we cannot operate without timezone + this.client.loadServerInfo(); + } this.schema = client.getDefaultDatabase(); this.defaultQuerySettings = new QuerySettings() .serverSetting(ServerSettings.ASYNC_INSERT, "0")