Skip to content
10 changes: 7 additions & 3 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class Client implements AutoCloseable {
private final Map<ClickHouseDataType, Class<?>> typeHintMapping;

// Server context
private String dbUser;
private String serverVersion;
private Object metricsRegistry;
private int retries;
Expand Down Expand Up @@ -196,7 +197,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
}

this.serverVersion = configuration.getOrDefault(ClientConfigProperties.SERVER_VERSION.getKey(), "unknown");

this.dbUser = configuration.getOrDefault(ClientConfigProperties.USER.getKey(), ClientConfigProperties.USER.getDefObjVal());
this.typeHintMapping = (Map<ClickHouseDataType, Class<?>>) this.configuration.get(ClientConfigProperties.TYPE_HINT_MAPPING.getKey());
}

Expand All @@ -208,7 +209,10 @@ public void loadServerInfo() {
try (QueryResponse response = this.query("SELECT currentUser() AS user, timezone() AS timezone, version() AS version LIMIT 1").get()) {
try (ClickHouseBinaryFormatReader reader = this.newBinaryFormatReader(response)) {
if (reader.next() != null) {
this.configuration.put(ClientConfigProperties.USER.getKey(), reader.getString("user"));
String tmpDbUser = reader.getString("user");
if (tmpDbUser != null && !tmpDbUser.isEmpty()) {
this.dbUser = tmpDbUser;
}
this.configuration.put(ClientConfigProperties.SERVER_TIMEZONE.getKey(), reader.getString("timezone"));
serverVersion = reader.getString("version");
}
Expand Down Expand Up @@ -2041,7 +2045,7 @@ public Set<String> getEndpoints() {
}

public String getUser() {
return (String) this.configuration.get(ClientConfigProperties.USER.getKey());
return dbUser;
}

public String getServerVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ public void setNetworkTimeout(long timeout, ChronoUnit unit) {
* @return timeout in ms.
*/
public Long getNetworkTimeout() {
return (Long) getOption(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(),
ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getDefaultValue());
// Socket operation timeout must be integer because of OS interface
// Network timeout may be something else in the future. So we need to cast it to Long.
return ((Number) getOption(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(),
ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getDefObjVal())).longValue();
}


Expand Down
31 changes: 31 additions & 0 deletions client-v2/src/test/java/com/clickhouse/client/ClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
import com.clickhouse.client.api.ConnectionReuseStrategy;
import com.clickhouse.client.api.command.CommandResponse;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
import com.clickhouse.client.api.internal.ServerSettings;
import com.clickhouse.client.api.metadata.DefaultColumnToMethodMatchingStrategy;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.api.query.QueryResponse;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.client.api.query.Records;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.query.QueryTests;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseVersion;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
Expand All @@ -24,6 +28,7 @@
import org.testng.annotations.Test;
import org.testng.util.Strings;

import java.io.ByteArrayInputStream;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -412,6 +417,32 @@ public void testLogComment() throws Exception {
}
}

@Test(groups = {"integration"})
public void testServerSettings() throws Exception {
try (Client client = newClient().build()) {
client.execute("DROP TABLE IF EXISTS server_settings_test_table");
client.execute("CREATE TABLE server_settings_test_table (v Float) Engine MergeTree ORDER BY ()");

final String queryId = UUID.randomUUID().toString();
InsertSettings insertSettings = new InsertSettings()
.setQueryId(queryId)
.serverSetting(ServerSettings.ASYNC_INSERT, "1")
.serverSetting(ServerSettings.WAIT_ASYNC_INSERT, "1");

String csvData = "0.33\n0.44\n0.55\n";
client.insert("server_settings_test_table", new ByteArrayInputStream(csvData.getBytes()), ClickHouseFormat.CSV, insertSettings).get().close();

client.execute("SYSTEM FLUSH LOGS").get().close();

List<GenericRecord> logRecords = client.queryAll("SELECT * FROM clusterAllReplicas('default', system.query_log) WHERE query_id = '" + queryId + "' AND type = 'QueryFinish'");

GenericRecord record = logRecords.get(0);
String settings = record.getString(record.getSchema().nameToColumnIndex("Settings"));
Assert.assertTrue(settings.contains(ServerSettings.ASYNC_INSERT + "=1"));
// Assert.assertTrue(settings.contains(ServerSettings.WAIT_ASYNC_INSERT + "=1")); // uncomment after server fix
}
}

public boolean isVersionMatch(String versionExpression, Client client) {
List<GenericRecord> serverVersion = client.queryAll("SELECT version()");
return ClickHouseVersion.of(serverVersion.get(0).getString(1)).check(versionExpression);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ void testQuerySettingsSpecific() throws Exception {

{
final QuerySettings settings = new QuerySettings();
Assert.assertEquals(settings.getNetworkTimeout().intValue(),
(Integer) ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getDefObjVal());
settings.setNetworkTimeout(10, ChronoUnit.SECONDS);
Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10));
}
Expand Down Expand Up @@ -164,6 +166,8 @@ public void testInsertSettingsSpecific() throws Exception {

{
final InsertSettings settings = new InsertSettings();
Assert.assertEquals(settings.getNetworkTimeout().intValue(),
(Integer) ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getDefObjVal());
settings.setNetworkTimeout(10, ChronoUnit.SECONDS);
Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;

Expand Down Expand Up @@ -104,7 +105,8 @@ protected Client.Builder newClient() {
.useHttpCompression(useHttpCompression)
.setDefaultDatabase(ClickHouseServerForTest.getDatabase())
.serverSetting(ServerSettings.ASYNC_INSERT, "0")
.serverSetting(ServerSettings.WAIT_END_OF_QUERY, "1");
.serverSetting(ServerSettings.WAIT_END_OF_QUERY, "1")
.setSharedOperationExecutor(Executors.newCachedThreadPool());
}

@AfterMethod(groups = { "integration" })
Expand Down Expand Up @@ -234,7 +236,6 @@ public void testInsertingPOJOWithNullValueForNonNullableColumn() throws Exceptio
try (InsertResponse response = client.insert(tableName, Collections.singletonList(pojo), settings).get(30, TimeUnit.SECONDS)) {
fail("Should have thrown an exception");
} catch (ClickHouseException e) {
e.printStackTrace();
assertTrue(e.getCause() instanceof IllegalArgumentException);
}
}
Expand Down Expand Up @@ -281,15 +282,12 @@ public void insertRawDataAsync(boolean async) throws Exception {
writer.flush();
client.insert(tableName, new ByteArrayInputStream(data.toByteArray()),
ClickHouseFormat.TSV, localSettings).whenComplete((response, throwable) -> {
OperationMetrics metrics = response.getMetrics();
assertEquals((int)response.getWrittenRows(), 1000 );

List<GenericRecord> records = client.queryAll("SELECT * FROM " + tableName);
assertEquals(records.size(), 1000);
assertTrue(Thread.currentThread().getName()
.startsWith(async ? "ForkJoinPool.commonPool" : "main"), "Threads starts with " + Thread.currentThread().getName());
})
.join(); // wait operation complete. only for tests
.join().close(); // wait operation complete. only for tests
}

@DataProvider
Expand Down Expand Up @@ -660,14 +658,10 @@ public void testCollectionInsert() throws Exception {
out.write(row.getBytes());
}
}, ClickHouseFormat.JSONEachRow, new InsertSettings()).get()) {
System.out.println("Rows written: " + response.getWrittenRows());
}

List<GenericRecord> records = client.queryAll("SELECT * FROM \"" + tableName + "\"" );

for (GenericRecord record : records) {
System.out.println("> " + record.getString(1) + ", " + record.getFloat(2) + ", " + record.getFloat(3));
}
assertEquals(records.size(), 4);
}

// static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ public static Object convert(Object value, Class<?> type, ClickHouseColumn colum
} else if (type == String.class) {
return value.toString();
} else if (type == Boolean.class || type == boolean.class) {
return Boolean.parseBoolean(value.toString());
String str = value.toString();
return !("false".equalsIgnoreCase(str) || "0".equalsIgnoreCase(str));
} else if (type == Byte.class || type == byte.class) {
return Byte.parseByte(value.toString());
} else if (type == Short.class || type == short.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,9 @@ private static String generateSqlTypeSizes(String columnName) {
SQLType type = JdbcUtils.CLICKHOUSE_TYPE_NAME_TO_SQL_TYPE_MAP.get(typeName);
if (type == null) {
try {
type = JdbcUtils.convertToSqlType(ClickHouseColumn.of("v1", typeName).getDataType());
ClickHouseColumn c = ClickHouseColumn.of("v", typeName);
ClickHouseDataType dt = c.getDataType();
type = JdbcUtils.convertToSqlType(dt);
} catch (Exception e) {
log.error("Failed to convert column data type to SQL type: {}", typeName, e);
type = JDBCType.OTHER; // In case of error, return SQL type 0
Expand Down Expand Up @@ -1092,8 +1094,23 @@ public ResultSet getTypeInfo() throws SQLException {
row.put("NULLABLE", nullability);
};

private static final Consumer<Map<String, Object>> TYPE_INFO_VALUE_FUNCTION = row -> {
String typeName = (String) row.get("TYPE_NAME");
SQLType type = JdbcUtils.CLICKHOUSE_TYPE_NAME_TO_SQL_TYPE_MAP.get(typeName);
if (type == null) {
try {
type = JdbcUtils.convertToSqlType(ClickHouseDataType.valueOf(typeName));
} catch (Exception e) {
log.error("Failed to convert column data type to SQL type: {}", typeName, e);
type = JDBCType.OTHER; // In case of error, return SQL type 0
}
}

row.put("DATA_TYPE", type.getVendorTypeNumber());
};

private static final List<Consumer<Map<String, Object>>> GET_TYPE_INFO_MUTATORS = Arrays.asList(
DATA_TYPE_VALUE_FUNCTION,
TYPE_INFO_VALUE_FUNCTION,
NULLABILITY_VALUE_FUNCTION
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import java.sql.Types;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand All @@ -37,7 +40,7 @@ public void testGetColumns() throws Exception {
final String tableName = "get_columns_metadata_test";
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate("" +
"CREATE TABLE " + tableName + " (id Int32, name String, v1 Nullable(Int8)) " +
"CREATE TABLE " + tableName + " (id Int32, name String NOT NULL, v1 Nullable(Int8), v2 Array(Int8)) " +
"ENGINE MergeTree ORDER BY ()");
}

Expand Down Expand Up @@ -109,6 +112,7 @@ public void testGetColumns() throws Exception {
assertEquals(rs.getInt("DATA_TYPE"), Types.INTEGER);
assertEquals(rs.getObject("DATA_TYPE"), Types.INTEGER);
assertEquals(rs.getString("TYPE_NAME"), "Int32");
assertFalse(rs.getBoolean("NULLABLE"));

assertTrue(rs.next());
assertEquals(rs.getString("TABLE_SCHEM"), getDatabase());
Expand All @@ -117,6 +121,7 @@ public void testGetColumns() throws Exception {
assertEquals(rs.getInt("DATA_TYPE"), Types.VARCHAR);
assertEquals(rs.getObject("DATA_TYPE"), Types.VARCHAR);
assertEquals(rs.getString("TYPE_NAME"), "String");
assertFalse(rs.getBoolean("NULLABLE"));

assertTrue(rs.next());
assertEquals(rs.getString("TABLE_SCHEM"), getDatabase());
Expand All @@ -125,6 +130,16 @@ public void testGetColumns() throws Exception {
assertEquals(rs.getInt("DATA_TYPE"), Types.TINYINT);
assertEquals(rs.getObject("DATA_TYPE"), Types.TINYINT);
assertEquals(rs.getString("TYPE_NAME"), "Nullable(Int8)");
assertTrue(rs.getBoolean("NULLABLE"));

assertTrue(rs.next());
assertEquals(rs.getString("TABLE_SCHEM"), getDatabase());
assertEquals(rs.getString("TABLE_NAME"), tableName);
assertEquals(rs.getString("COLUMN_NAME"), "v2");
assertEquals(rs.getInt("DATA_TYPE"), Types.ARRAY);
assertEquals(rs.getObject("DATA_TYPE"), Types.ARRAY);
assertEquals(rs.getString("TYPE_NAME"), "Array(Int8)");
assertFalse(rs.getBoolean("NULLABLE"));
}
}
}
Expand Down Expand Up @@ -450,6 +465,24 @@ public void testGetTypeInfo() throws Exception {
}
}

@Test(groups = {"integration"})
public void testFindNestedTypes() throws Exception {
try (Connection conn = getJdbcConnection()) {
DatabaseMetaData dbmd = conn.getMetaData();
try (ResultSet rs = dbmd.getTypeInfo()) {
Set<String> nestedTypes = Arrays.stream(ClickHouseDataType.values())
.filter(dt -> dt.isNested()).map(dt -> dt.name()).collect(Collectors.toSet());

while (rs.next()) {
String typeName = rs.getString("TYPE_NAME");
nestedTypes.remove(typeName);
}

assertTrue(nestedTypes.isEmpty(), "Nested types " + nestedTypes + " not found");
}
}
}

@Test(groups = { "integration" })
public void testGetFunctions() throws Exception {
if (ClickHouseVersion.of(getServerVersion()).check("(,23.8]")) {
Expand Down
Loading