Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseFormat;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http.ClassicHttpResponse;
Expand Down Expand Up @@ -185,31 +184,25 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
} else {
this.lz4Factory = LZ4Factory.fastestJavaInstance();
}

this.serverVersion = configuration.getOrDefault(ClientConfigProperties.SERVER_VERSION.getKey(), "unknown");
}

/**
* Loads essential information about a server. Should be called after client creation.
*
*/
public void loadServerInfo() {
// only if 2 properties are set disable retrieval from server
if (!this.configuration.containsKey(ClientConfigProperties.SERVER_TIMEZONE.getKey()) && !this.configuration.containsKey(ClientConfigProperties.SERVER_VERSION.getKey())) {
try (QueryResponse response = this.query("SELECT currentUser() AS user, timezone() AS timezone, version() AS version LIMIT 1").get()) {
try (ClickHouseBinaryFormatReader reader = this.newBinaryFormatReader(response)) {
if (reader.next() != null) {
this.configuration.put(ClientConfigProperties.USER.getKey(), reader.getString("user"));
this.configuration.put(ClientConfigProperties.SERVER_TIMEZONE.getKey(), reader.getString("timezone"));
serverVersion = reader.getString("version");
}
try (QueryResponse response = this.query("SELECT currentUser() AS user, timezone() AS timezone, version() AS version LIMIT 1").get()) {
try (ClickHouseBinaryFormatReader reader = this.newBinaryFormatReader(response)) {
if (reader.next() != null) {
this.configuration.put(ClientConfigProperties.USER.getKey(), reader.getString("user"));
this.configuration.put(ClientConfigProperties.SERVER_TIMEZONE.getKey(), reader.getString("timezone"));
serverVersion = reader.getString("version");
}
} catch (Exception e) {
throw new ClientException("Failed to get server info", e);
}
} else {
LOG.info("Using server version " + this.configuration.get(ClientConfigProperties.SERVER_VERSION.getKey()) + " and timezone " + this.configuration.get(ClientConfigProperties.SERVER_TIMEZONE.getKey()) );
if (this.configuration.containsKey(ClientConfigProperties.SERVER_VERSION.getKey())) {
serverVersion = this.configuration.get(ClientConfigProperties.SERVER_VERSION.getKey());
}
} catch (Exception e) {
throw new ClientException("Failed to get server info", e);
}
}

Expand Down Expand Up @@ -1213,8 +1206,11 @@ public boolean ping() {
*/
public boolean ping(long timeout) {
long startTime = System.nanoTime();
try (QueryResponse response = query("SELECT 1 FORMAT TabSeparated").get(timeout, TimeUnit.MILLISECONDS)) {
return true;
try {
CompletableFuture<QueryResponse> future = query("SELECT 1 FORMAT TabSeparated");
try (QueryResponse response = timeout > 0 ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get()) {
return true;
}
} catch (Exception e) {
LOG.debug("Failed to connect to the server (Duration: {})", System.nanoTime() - startTime, e);
return false;
Expand Down Expand Up @@ -2153,7 +2149,11 @@ private void applyDefaults(QuerySettings settings) {

private <T> CompletableFuture<T> runAsyncOperation(Supplier<T> resultSupplier, Map<String, Object> requestSettings) {
boolean isAsync = MapUtils.getFlag(requestSettings, configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey());
return isAsync ? CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor) : CompletableFuture.completedFuture(resultSupplier.get());
if (isAsync) {
return sharedOperationExecutor == null ? CompletableFuture.supplyAsync(resultSupplier) :
CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor);
}
return CompletableFuture.completedFuture(resultSupplier.get());
}

@Override
Expand Down
69 changes: 39 additions & 30 deletions client-v2/src/test/java/com/clickhouse/client/ClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class ClientTests extends BaseIntegrationTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientTests.class);

@Test(dataProvider = "clientProvider")
@Test(groups = {"integration"}, dataProvider = "secureClientProvider")
public void testAddSecureEndpoint(Client client) {
if (isCloud()) {
return; // will fail in other tests
Expand All @@ -52,31 +52,35 @@ public void testAddSecureEndpoint(Client client) {
}
}

@DataProvider(name = "clientProvider")
private static Client[] secureClientProvider() throws Exception {
@DataProvider
public static Object[][] secureClientProvider() throws Exception {
ClickHouseNode node = ClickHouseServerForTest.getClickHouseNode(ClickHouseProtocol.HTTP,
true, ClickHouseNode.builder()
.addOption(ClickHouseClientOption.SSL_MODE.getKey(), "none")
.addOption(ClickHouseClientOption.SSL.getKey(), "true").build());
return new Client[]{
new Client.Builder()
.addEndpoint("https://" + node.getHost() + ":" + node.getPort())
.setUsername("default")
.setPassword("")
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
.build(),
new Client.Builder()
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), true)
.setUsername("default")
.setPassword("")
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
.setClientKey("user.key")
.setClientCertificate("user.crt")
.build()
return new Client[][]{
{
new Client.Builder()
.addEndpoint("https://" + node.getHost() + ":" + node.getPort())
.setUsername("default")
.setPassword("")
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
.build()
},
{
new Client.Builder()
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), true)
.setUsername("default")
.setPassword("")
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
.setClientKey("user.key")
.setClientCertificate("user.crt")
.build()
}
};
}

@Test
@Test(groups = {"integration"})
public void testRawSettings() {
Client client = newClient()
.setOption("custom_setting_1", "value_1")
Expand All @@ -102,33 +106,39 @@ public void testRawSettings() {
}
}

@Test
@Test(groups = {"integration"})
public void testPing() {
try (Client client = newClient().build()) {
Assert.assertTrue(client.ping());
}
}

@Test
@Test(groups = {"integration"})
public void testPingUnpooled() {
try (Client client = newClient().enableConnectionPool(false).build()) {
Assert.assertTrue(client.ping());
}
}

@Test
@Test(groups = {"integration"})
public void testPingFailure() {
try (Client client = new Client.Builder()
.addEndpoint("http://localhost:12345")
.setUsername("default")
.setPassword("")
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
.build()) {
Assert.assertFalse(client.ping(TimeUnit.SECONDS.toMillis(20)));
}
}

@Test
@Test(groups = {"integration"})
public void testPingAsync() {
try (Client client = newClient().useAsyncRequests(true).build()) {
Assert.assertTrue(client.ping());
}
}

@Test(groups = {"integration"})
public void testSetOptions() {
Map<String, String> options = new HashMap<>();
String productName = "my product_name (version 1.0)";
Expand All @@ -140,7 +150,7 @@ public void testSetOptions() {
}
}

@Test
@Test(groups = {"integration"})
public void testProvidedExecutor() throws Exception {

ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand All @@ -159,19 +169,19 @@ public void testProvidedExecutor() throws Exception {
Assert.assertFalse(flag.get());
}

@Test
@Test(groups = {"integration"})
public void testLoadingServerContext() throws Exception {
long start = System.nanoTime();
try (Client client = newClient().build()) {
long initTime = (System.nanoTime() - start) / 1_000_000;
Assert.assertTrue(initTime < 100);
Assert.assertNull(client.getServerVersion());
Assert.assertEquals(client.getServerVersion(), "unknown");
client.loadServerInfo();
Assert.assertNotNull(client.getServerVersion());
}
}

@Test
@Test(groups = {"integration"})
public void testDisableNative() {
try (Client client = newClient().disableNativeCompression(true).build()) {
Assert.assertTrue(client.toString().indexOf("JavaUnsafe") != -1);
Expand All @@ -185,7 +195,6 @@ protected Client.Builder newClient() {
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure)
.setUsername("default")
.setPassword(ClickHouseServerForTest.getPassword())
.setDefaultDatabase(ClickHouseServerForTest.getDatabase())
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"));
.setDefaultDatabase(ClickHouseServerForTest.getDatabase());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void testConnectionRequestTimeout() {
}
}

@Test
@Test(groups = {"integration"})
public void testConnectionReuseStrategy() {
if (isCloud()) {
return; // mocked server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseServerForTest;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.ClientConfigProperties;
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.DataTypeUtils;
import com.clickhouse.client.api.command.CommandResponse;
Expand All @@ -26,6 +27,7 @@
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseVersion;
import com.clickhouse.data.format.BinaryStreamUtils;
import lombok.Data;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
Expand Down Expand Up @@ -269,6 +271,43 @@ public void insertRawData() throws Exception {
assertEquals(records.size(), 1000);
}

@Test(groups = { "integration" }, dataProvider = "insertRawDataAsyncProvider", dataProviderClass = InsertTests.class)
public void insertRawDataAsync(boolean async) throws Exception {
final String tableName = "raw_data_table_async";
final String createSQL = "CREATE TABLE " + tableName +
" (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()";

initTable(tableName, createSQL);

InsertSettings localSettings = new InsertSettings(settings.getAllSettings());
localSettings.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), async);
ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintWriter writer = new PrintWriter(data);
for (int i = 0; i < 1000; i++) {
writer.printf("%d\t%s\t%s\t%d\t%s\n", i, "2021-01-01 00:00:00", "name" + i, i, "p2");
}
writer.flush();
client.insert(tableName, new ByteArrayInputStream(data.toByteArray()),
ClickHouseFormat.TSV, localSettings).whenComplete((response, throwable) -> {
OperationMetrics metrics = response.getMetrics();
assertEquals((int)response.getWrittenRows(), 1000 );

List<GenericRecord> records = client.queryAll("SELECT * FROM " + tableName);
assertEquals(records.size(), 1000);
assertTrue(Thread.currentThread().getName()
.startsWith(async ? "ForkJoinPool.commonPool" : "main"), "Threads starts with " + Thread.currentThread().getName());
})
.join(); // wait operation complete. only for tests
}

@DataProvider
public static Object[][] insertRawDataAsyncProvider(){
return new Object[][] {
{true}, // async
{false} // blocking
};
}

@Test(groups = { "integration" }, dataProvider = "insertRawDataSimpleDataProvider", dataProviderClass = InsertTests.class)
public void insertRawDataSimple(String tableName) throws Exception {
// final String tableName = "raw_data_table";
Expand Down Expand Up @@ -639,10 +678,9 @@ public void testCollectionInsert() throws Exception {
}
}


static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
}
// static {
// System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
// }

@Test(groups = {"integration"}, dataProvider = "testAppCompressionDataProvider", dataProviderClass = InsertTests.class)
public void testAppCompression(String algo) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ public ConnectionImpl(String url, Properties info) throws SQLException {
this.client = this.config.applyClientProperties(new Client.Builder())
.setClientName(clientName)
.build();
this.client.loadServerInfo();
String serverTimezone = this.client.getServerTimeZone();
if (serverTimezone == null) {
// we cannot operate without timezone
this.client.loadServerInfo();
}
this.schema = client.getDefaultDatabase();
this.defaultQuerySettings = new QuerySettings()
.serverSetting(ServerSettings.ASYNC_INSERT, "0")
Expand Down
Loading