diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java index 88fa579dd..6535d2ea7 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java @@ -20,13 +20,15 @@ public class ClickHouseResponseSummary implements Serializable { public static final class Progress implements Serializable { private static final long serialVersionUID = -1447066780591278108L; - static final Progress EMPTY = new Progress(0L, 0L, 0L, 0L, 0L); + static final Progress EMPTY = new Progress(0L, 0L, 0L, 0L, 0L, 0L, 0L); private final long read_rows; private final long read_bytes; private final long total_rows_to_read; private final long written_rows; private final long written_bytes; + private final long elapsed_time; + private final long result_rows; /** * Default constructor. @@ -36,14 +38,17 @@ public static final class Progress implements Serializable { * @param total_rows_to_read Total number of rows to be read * @param written_rows Number of rows written * @param written_bytes Volume of data written in bytes + * @param elapsed_time Query processing time in (ns) */ public Progress(long read_rows, long read_bytes, long total_rows_to_read, long written_rows, - long written_bytes) { + long written_bytes, long elapsed_time, long result_rows) { this.read_rows = read_rows; this.read_bytes = read_bytes; this.total_rows_to_read = total_rows_to_read; this.written_rows = written_rows; this.written_bytes = written_bytes; + this.elapsed_time = elapsed_time; + this.result_rows = result_rows; } public long getReadRows() { @@ -66,6 +71,13 @@ public long getWrittenBytes() { return written_bytes; } + public long getElapsedTime() { + return elapsed_time; + } + + public long getResultRows() { + return result_rows; + } public Progress add(Progress progress) { if (progress == null) { return this; @@ -73,7 +85,8 @@ public Progress add(Progress progress) { return new Progress(read_rows + progress.read_rows, read_bytes + progress.read_bytes, total_rows_to_read + progress.total_rows_to_read, written_rows + progress.written_rows, - written_bytes + progress.written_bytes); + written_bytes + progress.written_bytes,elapsed_time + progress.elapsed_time, + result_rows + progress.result_rows); } public boolean isEmpty() { @@ -301,6 +314,14 @@ public int getUpdateCount() { return updates.get(); } + public long getElapsedTime() { + return progress.get().getElapsedTime(); + } + + public long getResultRows() { + return progress.get().getResultRows(); + } + public boolean isEmpty() { return progress.get().isEmpty() && stats.get().isEmpty(); } diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseResponseSummaryTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseResponseSummaryTest.java index f9e015496..7d66e82b1 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseResponseSummaryTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseResponseSummaryTest.java @@ -19,7 +19,8 @@ public void testConsutrctor() { Assert.assertEquals(summary.getWrittenBytes(), 0L); Assert.assertEquals(summary.getWrittenRows(), 0L); - Progress progress = new Progress(1L, 2L, 3L, 4L, 5L); + Progress progress = new Progress(1L, 2L, 3L, 4L, 5L, + 6L, 7L); Statistics stats = new Statistics(6L, 7L, 8L, true, 9L); summary = new ClickHouseResponseSummary(progress, stats); Assert.assertTrue(summary.getProgress() == progress); @@ -58,7 +59,7 @@ public void testAdd() { Assert.assertEquals(summary.getWrittenBytes(), 0L); Assert.assertEquals(summary.getWrittenRows(), 0L); - summary.add(new Progress(1L, 2L, 3L, 4L, 5L)); + summary.add(new Progress(1L, 2L, 3L, 4L, 5L, 6L, 7L)); Assert.assertEquals(summary.getReadBytes(), 2L); Assert.assertEquals(summary.getReadRows(), 1L); Assert.assertEquals(summary.getTotalRowsToRead(), 3L); @@ -93,13 +94,15 @@ public void testUpdate() { Assert.assertEquals(summary.getWrittenBytes(), 0L); Assert.assertEquals(summary.getWrittenRows(), 0L); - summary.update(new Progress(1L, 2L, 3L, 4L, 5L)); + summary.update(new Progress(1L, 2L, 3L, 4L, 5L, 6L, 7L)); Assert.assertEquals(summary.getReadBytes(), 2L); Assert.assertEquals(summary.getReadRows(), 1L); Assert.assertEquals(summary.getTotalRowsToRead(), 3L); Assert.assertEquals(summary.getUpdateCount(), 1L); Assert.assertEquals(summary.getWrittenBytes(), 5L); Assert.assertEquals(summary.getWrittenRows(), 4L); + Assert.assertEquals(summary.getProgress().getElapsedTime(), 6L); + Assert.assertEquals(summary.getProgress().getResultRows(), 7L); summary.update(new Statistics(6L, 7L, 8L, true, 9L)); Assert.assertEquals(summary.getReadBytes(), 2L); diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcResponse.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcResponse.java index 7248ad722..58050e865 100644 --- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcResponse.java +++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcResponse.java @@ -48,7 +48,7 @@ protected ClickHouseGrpcResponse(ClickHouseConfig config, Map globalClientStats = new ConcurrentHashMap<>(); + private Client(Set endpoints, Map configuration) { this.endpoints = endpoints; this.configuration = configuration; @@ -280,10 +283,14 @@ public void register(Class clazz, TableSchema schema) { public InsertResponse insert(String tableName, List data, InsertSettings settings) throws ClientException, IOException { + + String operationId = startOperation(); + settings.setSetting(INTERNAL_OPERATION_ID, operationId); + globalClientStats.get(operationId).start("serialization"); + if (data == null || data.isEmpty()) { throw new IllegalArgumentException("Data cannot be empty"); } - StopWatch watch = StopWatch.createStarted(); //Add format to the settings if (settings == null) { @@ -318,8 +325,8 @@ public InsertResponse insert(String tableName, } } - watch.stop(); - LOG.debug("Total serialization time: {}", watch.getTime()); + globalClientStats.get(operationId).stop("serialization"); + LOG.debug("Total serialization time: {}", globalClientStats.get(operationId).getElapsedTime("serialization")); return insert(tableName, new ByteArrayInputStream(stream.toByteArray()), settings); } @@ -329,7 +336,13 @@ public InsertResponse insert(String tableName, public InsertResponse insert(String tableName, InputStream data, InsertSettings settings) throws IOException, ClientException { - StopWatch watch = StopWatch.createStarted(); + String operationId = (String) settings.getSetting(INTERNAL_OPERATION_ID); + if (operationId == null) { + operationId = startOperation(); + settings.setSetting(INTERNAL_OPERATION_ID, operationId); + } + OperationStatistics.ClientStatistics clientStats = globalClientStats.remove(operationId); + clientStats.start("insert"); InsertResponse response; try (ClickHouseClient client = createClient()) { ClickHouseRequest.Mutation request = createMutationRequest(client.write(getServerNode()), tableName, settings) @@ -346,14 +359,15 @@ public InsertResponse insert(String tableName, } } try { - response = new InsertResponse(client, future.get()); + response = new InsertResponse(client, future.get(), clientStats); } catch (InterruptedException | ExecutionException e) { throw new ClientException("Operation has likely timed out.", e); } } - watch.stop(); - LOG.debug("Total insert (InputStream) time: {}", watch.getTime()); + clientStats.stop("insert"); + LOG.debug("Total insert (InputStream) time: {}", + clientStats.getElapsedTime("insert")); return response; } @@ -371,6 +385,9 @@ public InsertResponse insert(String tableName, * @return */ public Future query(String sqlQuery, Map qparams, QuerySettings settings) { + + OperationStatistics.ClientStatistics clientStats = new OperationStatistics.ClientStatistics(); + clientStats.start("query"); ClickHouseClient client = createClient(); ClickHouseRequest request = client.read(getServerNode()); @@ -393,7 +410,9 @@ public Future query(String sqlQuery, Map qparams, MDC.put("queryId", settings.getQueryID()); LOG.debug("Executing request: {}", request); try { - future.complete(new QueryResponse(client, request.execute(), settings, format)); + QueryResponse queryResponse = new QueryResponse(client, request.execute(), settings, format, + clientStats); + future.complete(queryResponse); } catch (Exception e) { future.completeExceptionally(e); } finally { @@ -472,4 +491,12 @@ private static Set createFormatWhitelist(String shouldSupport) { } return Collections.unmodifiableSet(formats); } + + private static final String INTERNAL_OPERATION_ID = "operationID"; + + private String startOperation() { + String operationId = UUID.randomUUID().toString(); + globalClientStats.put(operationId, new OperationStatistics.ClientStatistics()); + return operationId; + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/OperationStatistics.java b/client-v2/src/main/java/com/clickhouse/client/api/OperationStatistics.java new file mode 100644 index 000000000..57d642a7c --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/OperationStatistics.java @@ -0,0 +1,156 @@ +package com.clickhouse.client.api; + +import com.clickhouse.client.ClickHouseResponseSummary; +import com.clickhouse.client.api.internal.StopWatch; + +import java.util.HashMap; +import java.util.Map; + +/** + * OperationStatistics objects hold various stats for complete operations. + *

+ * It can be used for logging or monitoring purposes. + */ +public class OperationStatistics { + + public static final ServerStatistics EMPTY_SERVER_STATS = new ServerStatistics(-1, -1, -1, -1, -1, -1, -1); + + public ServerStatistics serverStatistics; + + public ClientStatistics clientStatistics; + + public OperationStatistics(ClientStatistics clientStatistics) { + this.serverStatistics = EMPTY_SERVER_STATS; + this.clientStatistics = clientStatistics; + } + + public void setClientStatistics(ClientStatistics clientStatistics) { + this.clientStatistics = clientStatistics; + } + + public ClientStatistics getClientStatistics() { + return clientStatistics; + } + + public ServerStatistics getServerStatistics() { + return serverStatistics; + } + + public void updateServerStats(ClickHouseResponseSummary summary) { + if (summary == null || summary.equals(ClickHouseResponseSummary.EMPTY)) { + this.serverStatistics = EMPTY_SERVER_STATS; + return; + } + + this.serverStatistics = new ServerStatistics( + summary.getReadRows(), + summary.getReadBytes(), + summary.getTotalRowsToRead(), + summary.getWrittenRows(), + summary.getWrittenBytes(), + summary.getResultRows(), + summary.getElapsedTime() + ); + } + + @Override + public String toString() { + return "OperationStatistics{" + + "\"serverStatistics\"=" + serverStatistics + + ", \"clientStatistics\"=" + clientStatistics + + '}'; + } + + /** + * Stats returned by the server. + *

+ * `-1` means the value is not available. + */ + public static class ServerStatistics { + + /** + * Number of rows read by server from the storage. + */ + public final long numRowsRead; + + /** + * Number of rows written by server to the storage. + */ + public final long numRowsWritten; + + /** + * Estimated number of rows to read from the storage. + *

+ */ + public final long totalRowsToRead; + + /** + * Number of bytes read by server from the storage. + */ + public final long numBytesRead; + + /** + * Number of bytes written by server to the storage. + */ + public final long numBytesWritten; + + /** + * Number of returned rows. + */ + public final long resultRows; + + /** + * Elapsed time in nanoseconds. + */ + public final long elapsedTime; + + + public ServerStatistics(long numRowsRead, long numBytesRead, long totalRowsToRead, long numRowsWritten, long numBytesWritten, long resultRows, long elapsedTime) { + this.numRowsRead = numRowsRead; + this.numBytesRead = numBytesRead; + this.totalRowsToRead = totalRowsToRead; + this.numRowsWritten = numRowsWritten; + this.numBytesWritten = numBytesWritten; + this.resultRows = resultRows; + this.elapsedTime = elapsedTime; + } + + @Override + public String toString() { + return "ServerStatistics{" + + "\"numRowsRead\"=" + numRowsRead + + ", \"numRowsWritten\"=" + numRowsWritten + + ", \"totalRowsToRead\"=" + totalRowsToRead + + ", \"numBytesRead\"=" + numBytesRead + + ", \"numBytesWritten\"=" + numBytesWritten + + ", \"resultRows\"=" + resultRows + + ", \"elapsedTime\"=\"" + elapsedTime + "ns\"" + + '}'; + } + } + + public static class ClientStatistics { + private final Map spans = new HashMap<>(); + + public void start(String spanName) { + spans.computeIfAbsent(spanName, k -> new StopWatch()).start(); + } + + public void stop(String spanName) { + spans.computeIfAbsent(spanName, k -> new StopWatch()).stop(); + } + + public long getElapsedTime(String spanName) { + StopWatch sw = spans.get(spanName); + return sw == null ? -1 : sw.getElapsedTime(); + } + + @Override + public String toString() { + return "ClientStatistics{" + + "\"spans\"=" + spans + + '}'; + } + } + +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java index 7db51b495..41fa0fdb4 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertResponse.java @@ -3,18 +3,24 @@ import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseResponseSummary; +import com.clickhouse.client.api.OperationStatistics; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class InsertResponse implements AutoCloseable { private final ClickHouseResponse responseRef; private final ClickHouseClient client; - public InsertResponse(ClickHouseClient client, ClickHouseResponse responseRef) { + private OperationStatistics operationStatistics; + + public InsertResponse(ClickHouseClient client, ClickHouseResponse responseRef, + OperationStatistics.ClientStatistics clientStatistics) { this.responseRef = responseRef; this.client = client; - } - - public ClickHouseResponseSummary getSummary() { - return responseRef.getSummary(); + this.operationStatistics = new OperationStatistics(clientStatistics); + this.operationStatistics.updateServerStats(responseRef.getSummary()); } @Override @@ -25,4 +31,8 @@ public void close() { client.close(); } } + + public OperationStatistics getOperationStatistics() { + return operationStatistics; + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/StopWatch.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/StopWatch.java new file mode 100644 index 000000000..70c2b76f9 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/StopWatch.java @@ -0,0 +1,41 @@ +package com.clickhouse.client.api.internal; + +import java.util.concurrent.TimeUnit; + +public class StopWatch { + + long elapsedNanoTime = 0; + long startNanoTime; + + public StopWatch() { + // do nothing + } + + public StopWatch(long startNanoTime) { + this.startNanoTime = startNanoTime; + } + + public void start() { + startNanoTime = System.nanoTime(); + } + + public void stop() { + elapsedNanoTime = System.nanoTime() - startNanoTime; + } + + /** + * Returns the elapsed time in milliseconds. + * @return + */ + public long getElapsedTime() { + return TimeUnit.NANOSECONDS.toMillis(elapsedNanoTime); + } + + @Override + public String toString() { + return "{" + + "\"elapsedNanoTime\"=" + elapsedNanoTime + + ", \"elapsedTime\"=" + getElapsedTime() + + '}'; + } +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java index e1933cef6..f53a64ca1 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java @@ -2,6 +2,7 @@ import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.api.OperationStatistics; import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.data.ClickHouseInputStream; @@ -34,25 +35,46 @@ public class QueryResponse implements AutoCloseable { private QuerySettings settings; + private OperationStatistics operationStatistics; + + private volatile boolean completed = false; + public QueryResponse(ClickHouseClient client, Future responseRef, - QuerySettings settings, ClickHouseFormat format) { + QuerySettings settings, ClickHouseFormat format, + OperationStatistics.ClientStatistics clientStatistics) { this.client = client; this.responseRef = responseRef; this.format = format; this.settings = settings; + this.operationStatistics = new OperationStatistics(clientStatistics); } - public boolean isDone() { - return responseRef.isDone(); + public boolean isCompleted() { + if (completed) { + return true; + } + if (responseRef.isDone()) { + makeComplete(); + } + + return completed; } public void ensureDone() { - if (!isDone()) { - try { - responseRef.get(completeTimeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException | InterruptedException | ExecutionException e) { - throw new RuntimeException(e); // TODO: handle exception - } + if (!completed) { + // TODO: thread-safety + makeComplete(); + } + } + + private void makeComplete() { + try { + ClickHouseResponse response = responseRef.get(completeTimeout, TimeUnit.MILLISECONDS); + completed = true; + operationStatistics.clientStatistics.stop("query"); + this.operationStatistics.updateServerStats(response.getSummary()); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + throw new RuntimeException(e); // TODO: handle exception } } @@ -77,4 +99,9 @@ public void close() throws Exception { public ClickHouseFormat getFormat() { return format; } + + public OperationStatistics getOperationStatistics() { + ensureDone(); + return operationStatistics; + } } diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java index 562c090d5..419110521 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java @@ -55,8 +55,8 @@ public void insertSimplePOJOs() throws ClickHouseException, ClientException, IOE } InsertResponse response = client.insert(tableName, simplePOJOs, settings); - ClickHouseResponseSummary summary = response.getSummary(); - assertNotEquals(summary, null); - assertEquals(simplePOJOs.size(), summary.getWrittenRows()); + assertEquals(simplePOJOs.size(), response.getOperationStatistics().getServerStatistics().numRowsWritten); + assertTrue(response.getOperationStatistics().getClientStatistics().getElapsedTime("insert") > 0); + assertTrue(response.getOperationStatistics().getClientStatistics().getElapsedTime("serialization") > 0); } } diff --git a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java index 4a8cdcd5b..8359102df 100644 --- a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java @@ -11,6 +11,7 @@ import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.api.Client; import com.clickhouse.client.api.DataTypeUtils; +import com.clickhouse.client.api.OperationStatistics; import com.clickhouse.client.api.Protocol; import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader; import com.clickhouse.client.api.data_formats.NativeFormatReader; @@ -867,6 +868,54 @@ public void testDataTypes(List columns, List> valueGene } } + @Test(groups = {"integration"}) + public void testQueryMetrics() throws Exception { + prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10); + + QuerySettings settings = new QuerySettings() + .setFormat(ClickHouseFormat.TabSeparated.name()); + + QueryResponse response = client.query("SELECT * FROM " + DATASET_TABLE + " LIMIT 3", + Collections.emptyMap(), settings).get(); + + // Stats should be available after the query is done + OperationStatistics stats = response.getOperationStatistics(); + OperationStatistics.ServerStatistics serverStats = stats.serverStatistics; + System.out.println("Server stats: " + serverStats); + System.out.println("Client stats: " + stats.clientStatistics); + + Assert.assertTrue(serverStats.numBytesRead > 0); + Assert.assertEquals(serverStats.numBytesWritten, 0); + Assert.assertEquals(serverStats.numRowsRead, 10); // 10 rows in the table + Assert.assertEquals(serverStats.numRowsWritten, 0); + Assert.assertEquals(serverStats.resultRows, 3); + + StringBuilder insertStmtBuilder = new StringBuilder(); + insertStmtBuilder.append("INSERT INTO default.").append(DATASET_TABLE).append(" VALUES "); + final int rowsToInsert = 5; + for (int i = 0; i < rowsToInsert; i++) { + insertStmtBuilder.append("("); + Map values = writeValuesRow(insertStmtBuilder, DATASET_COLUMNS, DATASET_VALUE_GENERATORS); + insertStmtBuilder.setLength(insertStmtBuilder.length() - 2); + insertStmtBuilder.append("), "); + } + response = client.query(insertStmtBuilder.toString(), + Collections.emptyMap(), settings).get(); + + serverStats = response.getOperationStatistics().serverStatistics; + System.out.println("Server stats: " + serverStats); + System.out.println("Client stats: " + stats.clientStatistics); + + // Server stats: ServerStatistics{"numRowsRead"=10, "numRowsWritten"=10, "totalRowsToRead"=0, "numBytesRead"=651, "numBytesWritten"=651} + Assert.assertTrue(serverStats.numBytesRead > 0); + Assert.assertTrue(serverStats.numBytesWritten > 0); + Assert.assertEquals(serverStats.numRowsRead, rowsToInsert); // 10 rows in the table + Assert.assertEquals(serverStats.numRowsWritten, rowsToInsert); // 10 rows inserted + Assert.assertEquals(serverStats.totalRowsToRead, 0); + Assert.assertEquals(serverStats.resultRows, rowsToInsert); + Assert.assertTrue(stats.clientStatistics.getElapsedTime("query") > 0); + } + private final static List DATASET_COLUMNS = Arrays.asList( "col1 UInt32", "col2 Int32", @@ -920,37 +969,7 @@ private List> prepareDataSet(String table, List colu insertStmtBuilder.append("INSERT INTO default.").append(table).append(" VALUES "); for (int i = 0; i < rows; i++) { insertStmtBuilder.append("("); - Map values = new HashMap<>(); - Iterator columnIterator = columns.iterator(); - for (Function valueGenerator : valueGenerators) { - Object value = valueGenerator.apply(null); - if (value instanceof String) { - insertStmtBuilder.append('\'').append(value).append('\'').append(", "); - } else if (value instanceof BaseStream) { - insertStmtBuilder.append('['); - BaseStream stream = ((BaseStream) value); - for (Iterator it = stream.iterator(); it.hasNext(); ) { - insertStmtBuilder.append(quoteValue(it.next())).append(", "); - } - insertStmtBuilder.setLength(insertStmtBuilder.length() - 2); - insertStmtBuilder.append("], "); - } else if (value instanceof Map) { - insertStmtBuilder.append("{"); - Map map = (Map) value; - for (Map.Entry entry : map.entrySet()) { - insertStmtBuilder.append(quoteValue(entry.getKey())).append(" : ") - .append(quoteValue(entry.getValue())).append(", "); - } - insertStmtBuilder.setLength(insertStmtBuilder.length() - 2); - insertStmtBuilder.append("}, "); - } else if (value == null) { - insertStmtBuilder.append("NULL, "); - } else { - insertStmtBuilder.append(value).append(", "); - } - values.put(columnIterator.next().split(" ")[0], value); - - } + Map values = writeValuesRow(insertStmtBuilder, columns, valueGenerators); insertStmtBuilder.setLength(insertStmtBuilder.length() - 2); insertStmtBuilder.append("), "); data.add(values); @@ -965,6 +984,39 @@ private List> prepareDataSet(String table, List colu return data; } + private Map writeValuesRow(StringBuilder insertStmtBuilder, List columns, List> valueGenerators ) { + Map values = new HashMap<>(); + Iterator columnIterator = columns.iterator(); + for (Function valueGenerator : valueGenerators) { + Object value = valueGenerator.apply(null); + if (value instanceof String) { + insertStmtBuilder.append('\'').append(value).append('\'').append(", "); + } else if (value instanceof BaseStream) { + insertStmtBuilder.append('['); + BaseStream stream = ((BaseStream) value); + for (Iterator it = stream.iterator(); it.hasNext(); ) { + insertStmtBuilder.append(quoteValue(it.next())).append(", "); + } + insertStmtBuilder.setLength(insertStmtBuilder.length() - 2); + insertStmtBuilder.append("], "); + } else if (value instanceof Map) { + insertStmtBuilder.append("{"); + Map map = (Map) value; + for (Map.Entry entry : map.entrySet()) { + insertStmtBuilder.append(quoteValue(entry.getKey())).append(" : ") + .append(quoteValue(entry.getValue())).append(", "); + } + insertStmtBuilder.setLength(insertStmtBuilder.length() - 2); + insertStmtBuilder.append("}, "); + } else { + insertStmtBuilder.append(value).append(", "); + } + values.put(columnIterator.next().split(" ")[0], value); + + } + return values; + } + private String quoteValue(Object value) { if (value instanceof String) { return '\'' + value.toString() + '\'';