diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/ClickHouseBinaryFormatWriter.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/ClickHouseBinaryFormatWriter.java new file mode 100644 index 000000000..8494c16c3 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/ClickHouseBinaryFormatWriter.java @@ -0,0 +1,110 @@ +package com.clickhouse.client.api.data_formats; + +import com.clickhouse.data.ClickHouseFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Clob; +import java.sql.NClob; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; +import java.util.List; + +/** + * Experimental API + */ +public interface ClickHouseBinaryFormatWriter { + + /** + * Returns an output stream to which this writer is serializing values. + * Caution: this method is not intended for application usage. + * @return Output stream of the writer + */ + OutputStream getOutputStream(); + + int getRowCount(); + + ClickHouseFormat getFormat(); + + void clearRow(); + + void setValue(String column, Object value); + + void setValue(int colIndex, Object value); + + /** + * Writer current row or block to the output stream. + * Action is idempotent: if there are no new values set - this method has no effect. + * @throws IOException if writing to an output stream causes an error + */ + void commitRow() throws IOException; + + void setByte(String column, byte value); + + void setByte(int colIndex, byte value); + + void setShort(String column, short value); + + void setShort(int colIndex, short value); + + void setInteger(String column, int value); + + void setInteger(int colIndex, int value); + + void setLong(String column, long value); + + void setLong(int colIndex, long value); + + void setBigInteger(int colIndex, BigInteger value); + + void setBigInteger(String column, BigInteger value); + + void setFloat(int colIndex, float value); + + void setFloat(String column, float value); + + void setDouble(int colIndex, double value); + + void setDouble(String column, double value); + + void setBigDecimal(int colIndex, BigDecimal value); + + void setBigDecimal(String column, BigDecimal value); + + void setBoolean(int colIndex, boolean value); + + void setBoolean(String column, boolean value); + + void setString(String column, String value); + + void setString(int colIndex, String value); + + void setDate(String column, LocalDate value); + + void setDate(int colIndex, LocalDate value); + + void setDateTime(String column, LocalDateTime value); + + void setDateTime(int colIndex, LocalDateTime value); + + void setDateTime(String column, ZonedDateTime value); + + void setDateTime(int colIndex, ZonedDateTime value); + + void setList(String column, List value); + + void setList(int colIndex, List value); + + void setInputStream(int colIndex, InputStream in, long len); + + void setInputStream(String column, InputStream in, long len); + + void setReader(int colIndex, Reader reader, long len); + + void setReader(String column, Reader reader, long len); +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatWriter.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatWriter.java index ef8c18914..a487da1b9 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatWriter.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatWriter.java @@ -6,10 +6,15 @@ import com.clickhouse.data.ClickHouseFormat; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.math.BigInteger; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.List; @@ -21,7 +26,7 @@ *

* Experimental API */ -public class RowBinaryFormatWriter { +public class RowBinaryFormatWriter implements ClickHouseBinaryFormatWriter { private final OutputStream out; @@ -31,6 +36,10 @@ public class RowBinaryFormatWriter { private final boolean defaultSupport; + private int rowCount = 0; + + private boolean rowStarted = false; // indicates if at least one value was written to a row + public RowBinaryFormatWriter(OutputStream out, TableSchema tableSchema, ClickHouseFormat format) { if (format != ClickHouseFormat.RowBinary && format != ClickHouseFormat.RowBinaryWithDefaults) { throw new IllegalArgumentException("Only RowBinary and RowBinaryWithDefaults are supported"); @@ -42,96 +51,233 @@ public RowBinaryFormatWriter(OutputStream out, TableSchema tableSchema, ClickHou this.defaultSupport = format == ClickHouseFormat.RowBinaryWithDefaults; } + @Override + public OutputStream getOutputStream() { + return out; + } + + @Override + public int getRowCount() { + return rowCount; + } + + @Override + public ClickHouseFormat getFormat() { + return defaultSupport ? ClickHouseFormat.RowBinaryWithDefaults : ClickHouseFormat.RowBinary; + } + + @Override + public void clearRow() { + Arrays.fill(row, null); + rowStarted = false; + } + + @Override public void setValue(String column, Object value) { setValue(tableSchema.nameToColumnIndex(column), value); } + @Override public void setValue(int colIndex, Object value) { row[colIndex - 1] = value; + if (!rowStarted) { + rowStarted = true; + } } + @Override public void commitRow() throws IOException { - List columnList = tableSchema.getColumns(); - for (int i = 0; i < row.length; i++) { - ClickHouseColumn column = columnList.get(i); - // here we skip if we have a default value that is MATERIALIZED or ALIAS or ... - if (column.hasDefault() && column.getDefaultValue() != ClickHouseColumn.DefaultValue.DEFAULT) - continue; - if (RowBinaryFormatSerializer.writeValuePreamble(out, defaultSupport, column, row[i])) { - SerializerUtils.serializeData(out, row[i], column); + if (rowStarted) { + List columnList = tableSchema.getColumns(); + for (int i = 0; i < row.length; i++) { + ClickHouseColumn column = columnList.get(i); + // here we skip if we have a default value that is MATERIALIZED or ALIAS or ... + if (column.hasDefault() && column.getDefaultValue() != ClickHouseColumn.DefaultValue.DEFAULT) + continue; + if (RowBinaryFormatSerializer.writeValuePreamble(out, defaultSupport, column, row[i])) { + SerializerUtils.serializeData(out, row[i], column); + } } + clearRow(); + rowCount++; } } + @Override public void setByte(String column, byte value) { setValue(column, value); } + @Override public void setByte(int colIndex, byte value) { setValue(colIndex, value); } + @Override public void setShort(String column, short value) { setValue(column, value); } + @Override public void setShort(int colIndex, short value) { setValue(colIndex, value); } + @Override public void setInteger(String column, int value) { setValue(column, value); } + @Override public void setInteger(int colIndex, int value) { setValue(colIndex, value); } + @Override public void setLong(String column, long value) { setValue(column, value); } + @Override public void setLong(int colIndex, long value) { setValue(colIndex, value); } + @Override + public void setBigInteger(int colIndex, BigInteger value) { + setValue(colIndex, value); + } + + @Override + public void setBigInteger(String column, BigInteger value) { + setValue(column, value); + } + + @Override + public void setFloat(int colIndex, float value) { + setValue(colIndex, value); + } + + @Override + public void setFloat(String column, float value) { + setValue(column, value); + } + + @Override + public void setDouble(int colIndex, double value) { + setValue(colIndex, value); + } + + @Override + public void setDouble(String column, double value) { + setValue(column, value); + } + + @Override + public void setBigDecimal(int colIndex, BigDecimal value) { + setValue(colIndex, value); + } + + @Override + public void setBigDecimal(String column, BigDecimal value) { + setValue(column, value); + } + + @Override + public void setBoolean(int colIndex, boolean value) { + setValue(colIndex, value); + } + + @Override + public void setBoolean(String column, boolean value) { + setValue(column, value); + } + + @Override public void setString(String column, String value) { setValue(column, value); } + @Override public void setString(int colIndex, String value) { setValue(colIndex, value); } + @Override public void setDate(String column, LocalDate value) { setValue(column, value); } + @Override public void setDate(int colIndex, LocalDate value) { setValue(colIndex, value); } + @Override public void setDateTime(String column, LocalDateTime value) { setValue(column, value); } + @Override public void setDateTime(int colIndex, LocalDateTime value) { setValue(colIndex, value); } + @Override public void setDateTime(String column, ZonedDateTime value) { setValue(column, value); } + @Override public void setDateTime(int colIndex, ZonedDateTime value) { setValue(colIndex, value); } + @Override public void setList(String column, List value) { setValue(column, value); } + @Override public void setList(int colIndex, List value) { setValue(colIndex, value); } + + @Override + public void setInputStream(int colIndex, InputStream in, long len) { + setValue(colIndex, new InputStreamHolder(in, len)); + } + + @Override + public void setInputStream(String column, InputStream in, long len) { + setValue(column, new InputStreamHolder(in, len)); + } + + @Override + public void setReader(int colIndex, Reader reader, long len) { + setValue(colIndex, new ReaderHolder(reader, len)); + } + + @Override + public void setReader(String column, Reader reader, long len) { + setValue(column, new ReaderHolder(reader, len)); + } + + private static class InputStreamHolder { + final InputStream stream; + final long length; + InputStreamHolder(InputStream stream, long length) { + this.stream = stream; + this.length = length; + } + } + + private static class ReaderHolder { + final Reader read; + final long length; + ReaderHolder(Reader reader, long length) { + this.read = reader; + this.length = length; + } + } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java index accb65daa..def76f034 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java @@ -3,14 +3,18 @@ import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.internal.ServerSettings; +import com.clickhouse.client.api.metadata.TableSchema; import com.clickhouse.client.api.query.GenericRecord; import com.clickhouse.client.api.query.QuerySettings; import com.clickhouse.data.ClickHouseDataType; import com.clickhouse.jdbc.internal.ClientInfoProperties; +import com.clickhouse.jdbc.internal.DriverProperties; import com.clickhouse.jdbc.internal.JdbcConfiguration; import com.clickhouse.jdbc.internal.ExceptionUtils; import com.clickhouse.jdbc.internal.JdbcUtils; +import com.clickhouse.jdbc.internal.StatementParser; import com.clickhouse.jdbc.metadata.DatabaseMetaDataImpl; +import com.google.common.collect.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -360,7 +364,20 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency, in @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { checkOpen(); - return new PreparedStatementImpl(this, sql); + + StatementParser.ParsedStatement parsedStatement = StatementParser.parsePreparedStatement(sql); + + if (config.isBetaFeatureEnabled(DriverProperties.BETA_ROW_BINARY_WRITER)) { + if (parsedStatement.getType() == StatementParser.StatementType.INSERT) { + if (!parsedStatement.hasColumnList() && !PreparedStatementImpl.FUNC_DETECT_REGEXP.matcher(sql).find()) { + TableSchema tableSchema = client.getTableSchema(parsedStatement.getTableName(), schema); + if (tableSchema.getColumns().size() == parsedStatement.getArgumentCount()) { + return new WriterStatementImpl(this, sql, tableSchema, parsedStatement); + } + } + } + } + return new PreparedStatementImpl(this, sql, parsedStatement); } @Override diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/PreparedStatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/PreparedStatementImpl.java index e26856363..e649c827d 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/PreparedStatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/PreparedStatementImpl.java @@ -5,6 +5,7 @@ import com.clickhouse.data.Tuple; import com.clickhouse.jdbc.internal.ExceptionUtils; import com.clickhouse.jdbc.internal.JdbcUtils; +import com.clickhouse.jdbc.internal.StatementParser; import com.clickhouse.jdbc.metadata.ParameterMetaDataImpl; import com.clickhouse.jdbc.metadata.ResultSetMetaDataImpl; import org.slf4j.Logger; @@ -45,6 +46,7 @@ import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; import java.util.*; +import java.util.regex.Pattern; public class PreparedStatementImpl extends StatementImpl implements PreparedStatement, JdbcV2Wrapper { private static final Logger LOG = LoggerFactory.getLogger(PreparedStatementImpl.class); @@ -57,26 +59,36 @@ public class PreparedStatementImpl extends StatementImpl implements PreparedStat private final Calendar defaultCalendar; + // common fields private final String originalSql; private final String [] sqlSegments; - private String [] valueSegments; private final Object [] parameters; + private final StatementParser.StatementType statementType; + + // insert + private String [] valueSegments; private String insertIntoSQL; - private final StatementType statementType; private final ParameterMetaData parameterMetaData; private ResultSetMetaData resultSetMetaData = null; - public PreparedStatementImpl(ConnectionImpl connection, String sql) throws SQLException { + // Detects if any of the arguments is within function parameters + static final Pattern FUNC_DETECT_REGEXP = Pattern.compile( + "\\b(?!values?\\b)[A-Za-z_]\\w*\\([^)]*\\?[^)]*\\)", + Pattern.CASE_INSENSITIVE); + + private static final Pattern VALUES_PARAMETER_SPLIT = Pattern.compile("\\?(?=(?:[^']*'[^']*')*[^']*$)"); + + public PreparedStatementImpl(ConnectionImpl connection, String sql, StatementParser.ParsedStatement parsedStatement) throws SQLException { super(connection); this.isPoolable = true; // PreparedStatement is poolable by default this.originalSql = sql.trim(); //Split the sql string into an array of strings around question mark tokens - this.sqlSegments = splitStatement(originalSql); - this.statementType = parseStatementType(originalSql); + this.sqlSegments = parsedStatement.getSqlSegments(); + this.statementType = parsedStatement.getType(); - if (this.statementType == StatementType.INSERT) { + if (this.statementType == StatementParser.StatementType.INSERT) { insertIntoSQL = originalSql.substring(0, originalSql.indexOf("VALUES") + 6); valueSegments = originalSql.substring(originalSql.indexOf("VALUES") + 6).split("\\?"); } @@ -243,7 +255,7 @@ public boolean execute() throws SQLException { @Override public void addBatch() throws SQLException { checkClosed(); - if (statementType == StatementType.INSERT) { + if (statementType == StatementParser.StatementType.INSERT) { // adding values to the end of big INSERT statement. super.addBatch(compileSql(valueSegments)); } else { @@ -254,7 +266,7 @@ public void addBatch() throws SQLException { @Override public int[] executeBatch() throws SQLException { checkClosed(); - if (statementType == StatementType.INSERT && !batch.isEmpty()) { + if (statementType == StatementParser.StatementType.INSERT && !batch.isEmpty()) { // write insert into as batch to avoid multiple requests StringBuilder sb = new StringBuilder(); sb.append(insertIntoSQL).append(" "); @@ -334,7 +346,7 @@ public ResultSetMetaData getMetaData() throws SQLException { if (resultSetMetaData == null && currentResultSet == null) { // before execution - if (statementType == StatementType.SELECT) { + if (statementType == StatementParser.StatementType.SELECT) { try { // Replace '?' with NULL to make SQL valid for DESCRIBE String sql = JdbcUtils.replaceQuestionMarks(originalSql, JdbcUtils.NULL); @@ -362,33 +374,43 @@ public ResultSetMetaData getMetaData() throws SQLException { @Override public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { checkClosed(); + parameters[parameterIndex - 1] = encodeObject(sqlDateToInstant(x, cal)); + } + + protected Instant sqlDateToInstant(Date x, Calendar cal) { LocalDate d = x.toLocalDate(); Calendar c = (Calendar) (cal != null ? cal : defaultCalendar).clone(); c.clear(); c.set(d.getYear(), d.getMonthValue() - 1, d.getDayOfMonth(), 0, 0, 0); - parameters[parameterIndex - 1] = encodeObject(c.toInstant()); + return c.toInstant(); } @Override public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { checkClosed(); + parameters[parameterIndex - 1] = encodeObject(sqlTimeToInstant(x, cal)); + } + protected Instant sqlTimeToInstant(Time x, Calendar cal) { LocalTime t = x.toLocalTime(); Calendar c = (Calendar) (cal != null ? cal : defaultCalendar).clone(); c.clear(); c.set(1970, Calendar.JANUARY, 1, t.getHour(), t.getMinute(), t.getSecond()); - parameters[parameterIndex - 1] = encodeObject(c.toInstant()); + return c.toInstant(); } @Override public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { checkClosed(); + parameters[parameterIndex - 1] = encodeObject(sqlTimestampToZDT(x, cal)); + } + protected ZonedDateTime sqlTimestampToZDT(Timestamp x, Calendar cal) { LocalDateTime ldt = x.toLocalDateTime(); Calendar c = (Calendar) (cal != null ? cal : defaultCalendar).clone(); c.clear(); c.set(ldt.getYear(), ldt.getMonthValue() - 1, ldt.getDayOfMonth(), ldt.getHour(), ldt.getMinute(), ldt.getSecond()); - parameters[parameterIndex - 1] = encodeObject(c.toInstant().atZone(ZoneId.of("UTC")).withNano(x.getNanos())); + return c.toInstant().atZone(ZoneId.of("UTC")).withNano(x.getNanos()); } @Override @@ -787,74 +809,7 @@ private static String encodeObject(Object x) throws SQLException { } } - private static String escapeString(String x) { return x.replace("\\", "\\\\").replace("'", "\\'");//Escape single quotes } - - private static String [] splitStatement(String sql) { - List segments = new ArrayList<>(); - char[] chars = sql.toCharArray(); - int segmentStart = 0; - for (int i = 0; i < chars.length; i++) { - char c = chars[i]; - if (c == '\'' || c == '"' || c == '`') { - // string literal or identifier - i = skip(chars, i + 1, c, true); - } else if (c == '/' && lookahead(chars, i) == '*') { - // block comment - int end = sql.indexOf("*/", i); - if (end == -1) { - // missing comment end - break; - } - i = end + 1; - } else if (c == '#' || (c == '-' && lookahead(chars, i) == '-')) { - // line comment - i = skip(chars, i + 1, '\n', false); - } else if (c == '?') { - // question mark - segments.add(sql.substring(segmentStart, i)); - segmentStart = i + 1; - } - } - if (segmentStart < chars.length) { - segments.add(sql.substring(segmentStart)); - } else { - // add empty segment in case question mark was last char of sql - segments.add(""); - } - return segments.toArray(new String[0]); - } - - private static int skip(char[] chars, int from, char until, boolean escape) { - for (int i = from; i < chars.length; i++) { - char curr = chars[i]; - if (escape) { - char next = lookahead(chars, i); - if ((curr == '\\' && (next == '\\' || next == until)) || (curr == until && next == until)) { - // should skip: - // 1) double \\ (backslash escaped with backslash) - // 2) \[until] ([until] char, escaped with backslash) - // 3) [until][until] ([until] char, escaped with [until]) - i++; - continue; - } - } - - if (curr == until) { - return i; - } - } - return chars.length; - } - - private static char lookahead(char[] chars, int pos) { - pos = pos + 1; - if (pos >= chars.length) { - return '\0'; - } - return chars[pos]; - } - } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 168c9f9da..d5a394a89 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -7,8 +7,9 @@ import com.clickhouse.client.api.metrics.ServerMetrics; import com.clickhouse.client.api.query.QueryResponse; import com.clickhouse.client.api.query.QuerySettings; -import com.clickhouse.jdbc.internal.JdbcUtils; import com.clickhouse.jdbc.internal.ExceptionUtils; +import com.clickhouse.jdbc.internal.JdbcUtils; +import com.clickhouse.jdbc.internal.StatementParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,19 +24,18 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; public class StatementImpl implements Statement, JdbcV2Wrapper { private static final Logger LOG = LoggerFactory.getLogger(StatementImpl.class); ConnectionImpl connection; - private int queryTimeout; + protected int queryTimeout; protected boolean closed; protected ResultSetImpl currentResultSet; - private OperationMetrics metrics; + protected OperationMetrics metrics; protected List batch; private String lastSql; - private volatile String lastQueryId; + protected volatile String lastQueryId; private String schema; private int maxRows; protected boolean isPoolable = false; // Statement is not poolable by default @@ -56,69 +56,6 @@ protected void checkClosed() throws SQLException { } } - public enum StatementType { - SELECT, INSERT, DELETE, UPDATE, CREATE, DROP, ALTER, TRUNCATE, USE, SHOW, DESCRIBE, EXPLAIN, SET, KILL, OTHER, INSERT_INTO_SELECT - } - - public static StatementType parseStatementType(String sql) { - if (sql == null) { - return StatementType.OTHER; - } - - String trimmedSql = sql.trim(); - if (trimmedSql.isEmpty()) { - return StatementType.OTHER; - } - - trimmedSql = BLOCK_COMMENT.matcher(trimmedSql).replaceAll("").trim(); // remove comments - String[] lines = trimmedSql.split("\n"); - for (String line : lines) { - String trimmedLine = line.trim(); - //https://clickhouse.com/docs/en/sql-reference/syntax#comments - if (!trimmedLine.startsWith("--") && !trimmedLine.startsWith("#!") && !trimmedLine.startsWith("#")) { - String[] tokens = trimmedLine.split("\\s+"); - if (tokens.length == 0) { - continue; - } - - switch (tokens[0].toUpperCase()) { - case "SELECT": return StatementType.SELECT; - case "WITH": return StatementType.SELECT; - case "INSERT": - for (String token : tokens) { - if (token.equalsIgnoreCase("SELECT")) { - return StatementType.INSERT_INTO_SELECT; - } - } - return StatementType.INSERT; - case "DELETE": return StatementType.DELETE; - case "UPDATE": return StatementType.UPDATE; - case "CREATE": return StatementType.CREATE; - case "DROP": return StatementType.DROP; - case "ALTER": return StatementType.ALTER; - case "TRUNCATE": return StatementType.TRUNCATE; - case "USE": return StatementType.USE; - case "SHOW": return StatementType.SHOW; - case "DESCRIBE": return StatementType.DESCRIBE; - case "EXPLAIN": return StatementType.EXPLAIN; - case "SET": return StatementType.SET; - case "KILL": return StatementType.KILL; - default: return StatementType.OTHER; - } - } - } - - return StatementType.OTHER; - } - - protected static String parseTableName(String sql) { - String[] tokens = sql.trim().split("\\s+"); - if (tokens.length < 3) { - return null; - } - - return tokens[2]; - } protected static String parseJdbcEscapeSyntax(String sql) { LOG.trace("Original SQL: {}", sql); @@ -165,7 +102,7 @@ private void closePreviousResultSet() { } } - public ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) throws SQLException { + protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) throws SQLException { checkClosed(); // Closing before trying to do next request. Otherwise, deadlock because previous connection will not be // release before this one completes. @@ -214,13 +151,14 @@ public ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) throws @Override public int executeUpdate(String sql) throws SQLException { checkClosed(); - return executeUpdateImpl(sql, parseStatementType(sql), new QuerySettings().setDatabase(schema)); + return executeUpdateImpl(sql, StatementParser.parsedStatement(sql).getType(), new QuerySettings().setDatabase(schema)); } - protected int executeUpdateImpl(String sql, StatementType type, QuerySettings settings) throws SQLException { + protected int executeUpdateImpl(String sql, StatementParser.StatementType type, QuerySettings settings) throws SQLException { checkClosed(); - if (type == StatementType.SELECT || type == StatementType.SHOW || type == StatementType.DESCRIBE || type == StatementType.EXPLAIN) { + if (type == StatementParser.StatementType.SELECT || type == StatementParser.StatementType.SHOW + || type == StatementParser.StatementType.DESCRIBE || type == StatementParser.StatementType.EXPLAIN) { throw new SQLException("executeUpdate() cannot be called with a SELECT/SHOW/DESCRIBE/EXPLAIN statement", ExceptionUtils.SQL_STATE_SQL_ERROR); } @@ -345,15 +283,18 @@ public void setCursorName(String name) throws SQLException { @Override public boolean execute(String sql) throws SQLException { checkClosed(); - return executeImpl(sql, parseStatementType(sql), new QuerySettings().setDatabase(schema)); + return executeImpl(sql, StatementParser.parsedStatement(sql).getType(), new QuerySettings().setDatabase(schema)); } - public boolean executeImpl(String sql, StatementType type, QuerySettings settings) throws SQLException { + public boolean executeImpl(String sql, StatementParser.StatementType type, QuerySettings settings) throws SQLException { checkClosed(); - if (type == StatementType.SELECT || type == StatementType.SHOW || type == StatementType.DESCRIBE || type == StatementType.EXPLAIN) { - executeQueryImpl(sql, settings); // keep open to allow getResultSet() + if (type == StatementParser.StatementType.SELECT || + type == StatementParser.StatementType.SHOW || + type == StatementParser.StatementType.DESCRIBE || + type == StatementParser.StatementType.EXPLAIN) { + currentResultSet = executeQueryImpl(sql, settings); // keep open to allow getResultSet() return true; - } else if(type == StatementType.SET) { + } else if(type == StatementParser.StatementType.SET) { executeUpdateImpl(sql, type, settings); //SET ROLE List tokens = JdbcUtils.tokenizeSQL(sql); @@ -377,7 +318,7 @@ public boolean executeImpl(String sql, StatementType type, QuerySettings setting } } return false; - } else if (type == StatementType.USE) { + } else if (type == StatementParser.StatementType.USE) { executeUpdateImpl(sql, type, settings); //USE Database List tokens = JdbcUtils.tokenizeSQL(sql); @@ -633,6 +574,4 @@ public String enquoteNCharLiteral(String val) throws SQLException { public String getLastQueryId() { return lastQueryId; } - - private static final Pattern BLOCK_COMMENT = Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL); } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java new file mode 100644 index 000000000..6c4d55be5 --- /dev/null +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java @@ -0,0 +1,474 @@ +package com.clickhouse.jdbc; + +import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatWriter; +import com.clickhouse.client.api.data_formats.RowBinaryFormatWriter; +import com.clickhouse.client.api.insert.InsertResponse; +import com.clickhouse.client.api.insert.InsertSettings; +import com.clickhouse.client.api.metadata.TableSchema; +import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.jdbc.internal.ExceptionUtils; +import com.clickhouse.jdbc.internal.StatementParser; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLType; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Calendar; +import java.util.concurrent.TimeUnit; + +/** + * Implements data streaming through Client Writer API. + * See {@link PreparedStatementImpl} + * + */ +public class WriterStatementImpl extends PreparedStatementImpl implements PreparedStatement { + + + private ByteArrayOutputStream out; + private ClickHouseBinaryFormatWriter writer; + private final TableSchema tableSchema; + + public WriterStatementImpl(ConnectionImpl connection, String originalSql, TableSchema tableSchema, StatementParser.ParsedStatement parsedStatement) + throws SQLException { + super(connection, originalSql, parsedStatement); + + this.tableSchema = tableSchema; + try { + resetWriter(); + } catch (IOException e) { + throw new SQLException(e); + } + } + + private void resetWriter() throws IOException { + if (out != null) { + out.close(); + } + + out = new ByteArrayOutputStream(); + writer = new RowBinaryFormatWriter(out, tableSchema, tableSchema.hasDefaults() ? + ClickHouseFormat.RowBinaryWithDefaults : ClickHouseFormat.RowBinary); + } + + @Override + public ResultSet executeQuery() throws SQLException { + checkClosed(); + throw new UnsupportedOperationException("bug. This PreparedStatement implementation should not be used with queries"); + } + + @Override + public int executeUpdate() throws SQLException { + return (int) this.executeLargeUpdate(); + } + + @Override + public boolean getMoreResults() throws SQLException { + return false; // no result sets + } + + @Override + public ResultSet getResultSet() throws SQLException { + return null; // no result set + } + + @Override + public long executeLargeUpdate() throws SQLException { + checkClosed(); + + // commit whatever changes + try { + writer.commitRow(); + } catch (Exception e) { + throw new SQLException(e); + } + + int updateCount = 0; + InputStream in = new ByteArrayInputStream(out.toByteArray()); + InsertSettings settings = new InsertSettings(); + try (InsertResponse response = queryTimeout == 0 ? + connection.client.insert(tableSchema.getTableName(),in, writer.getFormat(), settings).get() + : connection.client.insert(tableSchema.getTableName(),in, writer.getFormat(), settings).get(queryTimeout, TimeUnit.SECONDS)) { + currentResultSet = null; + updateCount = Math.max(0, (int) response.getWrittenRows()); // when statement alters schema no result rows returned. + metrics = response.getMetrics(); + lastQueryId = response.getQueryId(); + } catch (Exception e) { + throw ExceptionUtils.toSqlState(e); + } finally { + try { + resetWriter(); + } catch (Exception e) { + // ignore + } + } + return updateCount; + } + + @Override + public void setNull(int parameterIndex, int sqlType) throws SQLException { + checkClosed(); + writer.setValue(parameterIndex, null); + } + + @Override + public void setBoolean(int parameterIndex, boolean x) throws SQLException { + checkClosed(); + writer.setValue(parameterIndex, x); + } + + @Override + public void setByte(int parameterIndex, byte x) throws SQLException { + checkClosed(); + writer.setByte(parameterIndex, x); + } + + @Override + public void setShort(int parameterIndex, short x) throws SQLException { + checkClosed(); + writer.setShort(parameterIndex, x); + } + + @Override + public void setInt(int parameterIndex, int x) throws SQLException { + checkClosed(); + writer.setInteger(parameterIndex, x); + } + + @Override + public void setLong(int parameterIndex, long x) throws SQLException { + checkClosed(); + writer.setLong(parameterIndex, x); + } + + @Override + public void setFloat(int parameterIndex, float x) throws SQLException { + checkClosed(); + writer.setFloat(parameterIndex, x); + } + + @Override + public void setDouble(int parameterIndex, double x) throws SQLException { + checkClosed(); + writer.setDouble(parameterIndex, x); + } + + @Override + public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + checkClosed(); + writer.setBigDecimal(parameterIndex, x); + } + + @Override + public void setString(int parameterIndex, String x) throws SQLException { + checkClosed(); + writer.setString(parameterIndex, x); + } + + @Override + public void setBytes(int parameterIndex, byte[] x) throws SQLException { + checkClosed(); + + } + + @Override + public void setDate(int parameterIndex, Date x) throws SQLException { + setDate(parameterIndex, x, null); + } + + @Override + public void setTime(int parameterIndex, Time x) throws SQLException { + setTime(parameterIndex, x, null); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { + setTimestamp(parameterIndex, x, null); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException { + checkClosed(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { + checkClosed(); + + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { + checkClosed(); + + } + + @Override + public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { + checkClosed(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException { + checkClosed(); + } + + + @Override + public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + checkClosed(); + + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { + checkClosed(); + + } + + @Override + public void setCharacterStream(int parameterIndex, Reader x, int length) throws SQLException { + checkClosed(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader x, long length) throws SQLException { + checkClosed(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader x) throws SQLException { + checkClosed(); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader x) throws SQLException { + checkClosed(); + + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader x, long length) throws SQLException { + checkClosed(); + + } + + @Override + public void clearParameters() throws SQLException { + checkClosed(); + writer.clearRow(); + } + + @Override + public boolean execute() throws SQLException { + executeLargeUpdate(); + return false; // no result set + } + + @Override + public void addBatch() throws SQLException { + checkClosed(); + try { + writer.commitRow(); + } catch (Exception e) { + throw new SQLException(e); + } + } + + @Override + public void setClob(int parameterIndex, Clob x) throws SQLException { + checkClosed(); + setClob(parameterIndex, x.getCharacterStream()); + } + + @Override + public void setClob(int parameterIndex, Reader x) throws SQLException { + checkClosed(); + setClob(parameterIndex, x, -1); + } + + @Override + public void setClob(int parameterIndex, Reader x, long length) throws SQLException { + checkClosed(); + writer.setReader(parameterIndex, x, length); + } + + @Override + public void setBlob(int parameterIndex, Blob x) throws SQLException { + checkClosed(); + setBlob(parameterIndex, x.getBinaryStream(), x.length()); + } + + @Override + public void setBlob(int parameterIndex, InputStream x, long length) throws SQLException { + checkClosed(); + writer.setInputStream(parameterIndex, x, length); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { + checkClosed(); + writer.setInputStream(parameterIndex, inputStream, -1); + } + + @Override + public void setNClob(int parameterIndex, Reader x, long length) throws SQLException { + checkClosed(); + writer.setReader(parameterIndex, x, length); + } + + @Override + public void setNClob(int parameterIndex, NClob x) throws SQLException { + checkClosed(); + setNClob(parameterIndex, x.getCharacterStream(), x.length()); + } + + @Override + public void setNClob(int parameterIndex, Reader x) throws SQLException { + checkClosed(); + setNClob(parameterIndex, x, -1); + } + + @Override + public void setSQLXML(int parameterIndex, SQLXML x) throws SQLException { + checkClosed(); + writer.setReader(parameterIndex, x.getCharacterStream(), -1); + } + + @Override + public void setArray(int parameterIndex, Array x) throws SQLException { + checkClosed(); + writer.setValue(parameterIndex, x.getArray()); + } + + @Override + public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + checkClosed(); + writer.setValue(parameterIndex, sqlDateToInstant(x, cal)); + } + + @Override + public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + checkClosed(); + writer.setValue(parameterIndex, sqlTimeToInstant(x, cal)); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { + checkClosed(); + writer.setDateTime(parameterIndex, sqlTimestampToZDT(x, cal)); + } + + @Override + public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { + checkClosed(); + writer.setValue(parameterIndex, null); + } + + @Override + public void setURL(int parameterIndex, URL x) throws SQLException { + checkClosed(); + + } + + @Override + public void setRowId(int parameterIndex, RowId x) throws SQLException { + checkClosed(); + throw new SQLException("ROWID is not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED); + } + + @Override + public void setNString(int parameterIndex, String value) throws SQLException { + checkClosed(); + writer.setString(parameterIndex, value); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { + checkClosed(); + // TODO: make proper data conversion in setObject methods + writer.setValue(parameterIndex, x); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException { + checkClosed(); + writer.setValue(parameterIndex, x); + } + + @Override + public void setObject(int parameterIndex, Object x) throws SQLException { + checkClosed(); + writer.setValue(parameterIndex, x); + } + + @Override + public void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException { + setObject(parameterIndex, x, targetSqlType, 0); + } + + @Override + public void setObject(int parameterIndex, Object x, SQLType targetSqlType, int scaleOrLength) throws SQLException { + checkClosed(); + writer.setValue(parameterIndex, x); + } + + @Override + public void close() throws SQLException { + super.close(); + try { + if (out != null) { + out.close(); + out = null; + } + writer = null; + } catch (Exception e) { + // ignore + } + } + + @Override + public void cancel() throws SQLException { + try { + resetWriter(); + } catch (Exception e ) { + throw new SQLException(e); + } + } + + @Override + public int[] executeBatch() throws SQLException { + checkClosed(); + int batchSize = writer.getRowCount(); + long rowsInserted = executeLargeUpdate(); + int[] results = new int[batchSize]; + Arrays.fill(results, batchSize == rowsInserted? 1 : PreparedStatement.SUCCESS_NO_INFO); + return results; + } + + @Override + public long[] executeLargeBatch() throws SQLException { + checkClosed(); + int batchSize = writer.getRowCount(); + long rowsInserted = executeLargeUpdate(); + long[] results = new long[batchSize]; + Arrays.fill(results, batchSize == rowsInserted? 1 : PreparedStatement.SUCCESS_NO_INFO); + return results; + } +} diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/DriverProperties.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/DriverProperties.java index c2d82f246..2a257c4b5 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/DriverProperties.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/DriverProperties.java @@ -22,7 +22,16 @@ public enum DriverProperties { * query settings to be passed along with query operation. * {@see com.clickhouse.client.api.query.QuerySettings} */ - DEFAULT_QUERY_SETTINGS("default_query_settings", null); + DEFAULT_QUERY_SETTINGS("default_query_settings", null), + + /** + * Enables row binary writer for simple insert statements when + * PreparedStatement is used. Has limitation and can be used with a simple form of insert like; + * {@code INSERT INTO t VALUES (?, ?, ?...)} + */ + BETA_ROW_BINARY_WRITER("beta.row_binary_for_simple_insert", "false"), + + ; private final String key; private final String defaultValue; diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java index 8fce772e0..5b2b93396 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java @@ -264,4 +264,8 @@ public static String getDefaultClientName() { return jdbcName.toString(); } + public boolean isBetaFeatureEnabled(DriverProperties prop) { + String value = driverProperties.getOrDefault(prop.getKey(), prop.getDefaultValue()); + return Boolean.parseBoolean(value); + } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java index 1a48260f1..6c7a2cdbb 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcUtils.java @@ -281,6 +281,19 @@ public static String escapeQuotes(String str) { .replace("\"", "\\\""); } + private static Pattern UNQUOTE_TABLE_NAME = Pattern.compile( + "^[\\\"`]?(.+?)[\\\"`]?$" + ); + + public static String unQuoteTableName(String str) { + Matcher matcher = UNQUOTE_TABLE_NAME.matcher(str.trim()); + if (matcher.find()) { + return matcher.group(1); + } else { + return str; + } + } + public static final String NULL = "NULL"; private static final Pattern REPLACE_Q_MARK_PATTERN = Pattern.compile("(\"[^\"]*\"|`[^`]*`|'[^']*')|(\\?)"); diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/StatementParser.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/StatementParser.java new file mode 100644 index 000000000..6da1ba403 --- /dev/null +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/StatementParser.java @@ -0,0 +1,265 @@ +package com.clickhouse.jdbc.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +public class StatementParser { + + + public enum StatementType { + SELECT, INSERT, DELETE, UPDATE, CREATE, DROP, ALTER, TRUNCATE, USE, SHOW, DESCRIBE, EXPLAIN, SET, KILL, OTHER, INSERT_INTO_SELECT + } + + public static ParsedStatement parsedStatement(String sql) { + return parseStatementType(sql); + } + + public static ParsedStatement parsePreparedStatement(String sql) { + ParsedStatement parsedStatement = parseStatementType(sql); + String[] sqlSegments = splitStatement(sql); + parsedStatement.setSqlSegments(sqlSegments); + return parsedStatement; + } + + private static final Pattern BLOCK_COMMENT = Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL); + + static ParsedStatement parseStatementType(String sql) { + String trimmedSql = sql == null ? "" : sql.trim(); + if (trimmedSql.isEmpty()) { + return new ParsedStatement(StatementType.OTHER, ""); + } + + trimmedSql = BLOCK_COMMENT.matcher(trimmedSql).replaceAll("").trim(); // remove comments + String[] lines = trimmedSql.split("\n"); + for (int i = 0; i < lines.length; i++) { + String trimmedLine = lines[i].trim(); + //https://clickhouse.com/docs/en/sql-reference/syntax#comments + if (!trimmedLine.startsWith("--") && !trimmedLine.startsWith("#!") && !trimmedLine.startsWith("#")) { + String[] tokens = trimmedLine.split("\\s+"); + if (tokens.length == 0) { + continue; + } + switch (tokens[0].toUpperCase()) { + case "SELECT": + return new ParsedStatement(StatementType.SELECT, trimmedSql); + case "WITH": + return new ParsedStatement(StatementType.SELECT, trimmedSql); + case "INSERT": + // TODO: it is not optimal to re-parse current line + boolean hasSelect = false; + boolean prevWasInto = false; + boolean prevWasTable = false; + boolean hasValues = false; + boolean hasColumnList = false; + String tableName = ""; + for (int j = i; j < lines.length; j++) { + trimmedLine = lines[j].trim(); + if (!trimmedLine.startsWith("--") && !trimmedLine.startsWith("#!") && !trimmedLine.startsWith("#")) { + tokens = trimmedLine.split("\\s+"); + if (tokens.length == 0) { + continue; + } + } + for (String token : tokens) { + if (!hasColumnList && !hasValues && token.contains("(")) { + hasColumnList = true; + } + if (token.equalsIgnoreCase("SELECT")) { + hasSelect = true; + break; // should be after we have found everything useful + } else if (token.equalsIgnoreCase("INTO")) { + prevWasInto = true; + } else if (token.equalsIgnoreCase("TABLE")) { + prevWasTable = true; + } else if (tableName.isEmpty() && (prevWasTable || prevWasInto)) { + tableName = extractTableNameFromSegment(token); + } else if (token.equalsIgnoreCase("VALUES")) { + hasValues = true; + } + } + } + ParsedStatement parsedStatement = + new ParsedStatement(hasSelect ? StatementType.INSERT_INTO_SELECT : StatementType.INSERT, trimmedSql); + parsedStatement.setTableName(tableName); + parsedStatement.setHasColumnList(hasColumnList); + return parsedStatement; + case "DELETE": + return new ParsedStatement(StatementType.DELETE, trimmedSql); + case "UPDATE": + return new ParsedStatement(StatementType.UPDATE, trimmedSql); + case "CREATE": + return new ParsedStatement(StatementType.CREATE, trimmedSql); + case "DROP": + return new ParsedStatement(StatementType.DROP, trimmedSql); + case "ALTER": + return new ParsedStatement(StatementType.ALTER, trimmedSql); + case "TRUNCATE": + return new ParsedStatement(StatementType.TRUNCATE, trimmedSql); + case "USE": + return new ParsedStatement(StatementType.USE, trimmedSql); + case "SHOW": + return new ParsedStatement(StatementType.SHOW, trimmedSql); + case "DESCRIBE": + return new ParsedStatement(StatementType.DESCRIBE, trimmedSql); + case "EXPLAIN": + return new ParsedStatement(StatementType.EXPLAIN, trimmedSql); + case "SET": + return new ParsedStatement(StatementType.SET, trimmedSql); + case "KILL": + return new ParsedStatement(StatementType.KILL, trimmedSql); + default: + return new ParsedStatement(StatementType.OTHER, trimmedSql); + } + } + } + + return new ParsedStatement(StatementType.OTHER, trimmedSql); + } + + + // DOT NOT USE: part of a parsing algorithm. + static String extractTableNameFromSegment(String segment) { + StringBuilder tableNameBuilder = new StringBuilder(); + char openedQuote = 0; + boolean escaping = false; + + for (char ch : segment.toCharArray()) { + if (escaping) { + tableNameBuilder.append(ch); + escaping = false; + continue; + } + if (openedQuote == ch) { + break; + } + switch (ch) { + case ' ': + continue; + case '\\': + escaping = true; + tableNameBuilder.append(ch); + break; + case '"': + case '`': + openedQuote = ch; + continue; + default: + tableNameBuilder.append(ch); + } + } + return tableNameBuilder.toString(); + } + + public static class ParsedStatement { + private final StatementType type; + private final String trimmedSql; + private String tableName; + private String[] sqlSegments; + private int argumentCount; + private boolean hasColumnList; + + ParsedStatement(StatementType type, String trimmedSql) { + this.type = type; + this.trimmedSql = trimmedSql; + } + + void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setSqlSegments(String[] sqlSegments) { + this.sqlSegments = sqlSegments; + this.argumentCount = sqlSegments == null ? 0 : sqlSegments.length -1; + } + + public String[] getSqlSegments() { + return this.sqlSegments; + } + + public StatementType getType() { + return this.type; + } + + public String getTableName() { + return tableName; + } + + public int getArgumentCount() { + return argumentCount; + } + + public void setHasColumnList(boolean hasColumnList) { + this.hasColumnList = hasColumnList; + } + + public boolean hasColumnList() { + return hasColumnList; + } + } + + private static String[] splitStatement(String sql) { + List segments = new ArrayList<>(); + char[] chars = sql.toCharArray(); + int segmentStart = 0; + for (int i = 0; i < chars.length; i++) { + char c = chars[i]; + if (c == '\'' || c == '"' || c == '`') { + // string literal or identifier + i = skip(chars, i + 1, c, true); + } else if (c == '/' && lookahead(chars, i) == '*') { + // block comment + int end = sql.indexOf("*/", i); + if (end == -1) { + // missing comment end + break; + } + i = end + 1; + } else if (c == '#' || (c == '-' && lookahead(chars, i) == '-')) { + // line comment + i = skip(chars, i + 1, '\n', false); + } else if (c == '?') { + // question mark + segments.add(sql.substring(segmentStart, i)); + segmentStart = i + 1; + } + } + if (segmentStart < chars.length) { + segments.add(sql.substring(segmentStart)); + } else { + // add empty segment in case question mark was last char of sql + segments.add(""); + } + return segments.toArray(new String[0]); + } + + private static int skip(char[] chars, int from, char until, boolean escape) { + for (int i = from; i < chars.length; i++) { + char curr = chars[i]; + if (escape) { + char next = lookahead(chars, i); + if ((curr == '\\' && (next == '\\' || next == until)) || (curr == until && next == until)) { + // should skip: + // 1) double \\ (backslash escaped with backslash) + // 2) \[until] ([until] char, escaped with backslash) + // 3) [until][until] ([until] char, escaped with [until]) + i++; + continue; + } + } + + if (curr == until) { + return i; + } + } + return chars.length; + } + + private static char lookahead(char[] chars, int pos) { + pos = pos + 1; + if (pos >= chars.length) { + return '\0'; + } + return chars[pos]; + } +} diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/PreparedStatementTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/PreparedStatementTest.java index f48408ea0..f9063a88c 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/PreparedStatementTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/PreparedStatementTest.java @@ -1,6 +1,6 @@ package com.clickhouse.jdbc; -import com.clickhouse.client.api.query.QuerySettings; +import com.clickhouse.jdbc.internal.DriverProperties; import org.apache.commons.lang3.RandomStringUtils; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -14,8 +14,17 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; import java.sql.Types; -import java.util.*; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.GregorianCalendar; +import java.util.Properties; +import java.util.Random; +import java.util.TimeZone; +import java.util.UUID; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -23,9 +32,6 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - - - public class PreparedStatementTest extends JdbcIntegrationTest { @Test(groups = { "integration" }) @@ -360,7 +366,7 @@ void testGetMetadata(String sql, int colCountBeforeExecution, Object[] values, @DataProvider(name = "testGetMetadataDataProvider") static Object[][] testGetMetadataDataProvider() { return new Object[][] { - {"INSERT INTO `%s` VALUES (?, ?, ?)", 0, new Object[]{"test", 0.3, 0.4}, 0}, + {"INSERT INTO `%s` VALUES (?, ?, ?)", 0, new Object[]{"test", 0.3f, 0.4f}, 0}, {"SELECT * FROM `%s`", 3, null, 3}, {"SHOW TABLES", 0, null, 1} }; @@ -556,28 +562,96 @@ void testStatementSplit() throws Exception { @Test(groups = {"integration"}) void testClearParameters() throws Exception { - String sql = "insert into `test_issue_2299` (`id`, `name`, `age`) values (?, ?, ?)"; - try (Connection conn = getJdbcConnection(); - PreparedStatementImpl ps = (PreparedStatementImpl) conn.prepareStatement(sql)) { + final String sql = "insert into `test_issue_2299` (`id`, `name`, `age`) values (?, ?, ?)"; + + + try (Connection conn = getJdbcConnection();) { try (Statement stmt = conn.createStatement()) { stmt.execute("CREATE TABLE IF NOT EXISTS `test_issue_2299` (`id` Nullable(String), `name` Nullable(String), `age` Int32) ENGINE Memory;"); } - Assert.assertEquals(ps.getParametersCount(), 3); + try (PreparedStatementImpl ps = (PreparedStatementImpl) conn.prepareStatement(sql)) { - ps.setString(1, "testId"); - ps.setString(2, "testName"); - ps.setInt(3, 18); - ps.execute(); + Assert.assertEquals(ps.getParametersCount(), 3); - ps.clearParameters(); - Assert.assertEquals(ps.getParametersCount(), 3); + ps.setString(1, "testId"); + ps.setString(2, "testName"); + ps.setInt(3, 18); + ps.execute(); - ps.setString(1, "testId2"); - ps.setString(2, "testName2"); - ps.setInt(3, 19); - ps.execute(); + ps.clearParameters(); + Assert.assertEquals(ps.getParametersCount(), 3); + + ps.setString(1, "testId2"); + ps.setString(2, "testName2"); + ps.setInt(3, 19); + ps.execute(); + } + } + } + + @Test + void testBatchInsert() throws Exception { + String table = "test_batch"; + long seed = System.currentTimeMillis(); + Random rnd = new Random(seed); + System.out.println("testBatchInsert seed" + seed); + Properties properties = new Properties(); + properties.put(DriverProperties.BETA_ROW_BINARY_WRITER.getKey(), "true"); + try (Connection conn = getJdbcConnection(properties)) { + + try (Statement stmt = conn.createStatement()) { + stmt.execute("CREATE TABLE IF NOT EXISTS " + table + + " ( ts DateTime, v1 Int32, v2 Float32, v3 Int32) Engine MergeTree ORDER BY ()"); + } + + String[] sql = new String[]{ + "INSERT INTO \n `%s` \nVALUES (?, ?, multiply(?, 10), ?)", // only string possible + "INSERT INTO\n `%s` \nVALUES (?, ?, ?, ?)", // row binary writer + " INSERT INTO %s (ts, v1, v2, v3) VALUES (?, ?, ?, ?)", // only string supported now + }; + Class[] impl = new Class[]{ + PreparedStatementImpl.class, + WriterStatementImpl.class, + PreparedStatementImpl.class + }; + + for (int i = 0; i < sql.length; i++) { + final int nBatches = 10; + try (PreparedStatement stmt = conn.prepareStatement(String.format(sql[i], table))) { + Assert.assertEquals(stmt.getClass(), impl[i]); + for (int bI = 0; bI < nBatches; bI++) { + stmt.setTimestamp(1, Timestamp.valueOf(LocalDateTime.now())); + stmt.setInt(2, rnd.nextInt()); + stmt.setFloat(3, rnd.nextFloat()); + stmt.setInt(4, rnd.nextInt()); + stmt.addBatch(); + } + + int[] result = stmt.executeBatch(); + for (int r : result) { + Assert.assertEquals(r, 1); + } + } + + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + table);) { + + int count = 0; + while (rs.next()) { + Timestamp ts = rs.getTimestamp(1); + assertNotNull(ts); + assertTrue(rs.getInt(2) != 0); + assertTrue(rs.getFloat(3) != 0.0f); + assertTrue(rs.getInt(4) != 0); + count++; + } + assertEquals(count, nBatches); + + stmt.execute("TRUNCATE " + table); + } + } } } diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java index 0fe8eeb8e..70da50fe5 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java @@ -433,35 +433,6 @@ public void testGettingArrays() throws Exception { } } - @Test(groups = { "integration" }) - public void testWithComments() throws Exception { - assertEquals(StatementImpl.parseStatementType(" /* INSERT TESTING */\n SELECT 1 AS num"), StatementImpl.StatementType.SELECT); - assertEquals(StatementImpl.parseStatementType("/* SELECT TESTING */\n INSERT INTO test_table VALUES (1)"), StatementImpl.StatementType.INSERT); - assertEquals(StatementImpl.parseStatementType("/* INSERT TESTING */\n\n\n UPDATE test_table SET num = 2"), StatementImpl.StatementType.UPDATE); - assertEquals(StatementImpl.parseStatementType("-- INSERT TESTING */\n SELECT 1 AS num"), StatementImpl.StatementType.SELECT); - assertEquals(StatementImpl.parseStatementType(" -- SELECT TESTING \n -- SELECT AGAIN \n INSERT INTO test_table VALUES (1)"), StatementImpl.StatementType.INSERT); - assertEquals(StatementImpl.parseStatementType(" SELECT 42 -- INSERT TESTING"), StatementImpl.StatementType.SELECT); - assertEquals(StatementImpl.parseStatementType("#! INSERT TESTING \n SELECT 1 AS num"), StatementImpl.StatementType.SELECT); - assertEquals(StatementImpl.parseStatementType("#!INSERT TESTING \n SELECT 1 AS num"), StatementImpl.StatementType.SELECT); - assertEquals(StatementImpl.parseStatementType("# INSERT TESTING \n SELECT 1 AS num"), StatementImpl.StatementType.SELECT); - assertEquals(StatementImpl.parseStatementType("#INSERT TESTING \n SELECT 1 AS num"), StatementImpl.StatementType.SELECT); - assertEquals(StatementImpl.parseStatementType("\nINSERT TESTING \n SELECT 1 AS num"), StatementImpl.StatementType.INSERT); - assertEquals(StatementImpl.parseStatementType(" \n INSERT TESTING \n SELECT 1 AS num"), StatementImpl.StatementType.INSERT); - assertEquals(StatementImpl.parseStatementType("select 1 AS num"), StatementImpl.StatementType.SELECT); - assertEquals(StatementImpl.parseStatementType("insert into test_table values (1)"), StatementImpl.StatementType.INSERT); - assertEquals(StatementImpl.parseStatementType("update test_table set num = 2"), StatementImpl.StatementType.UPDATE); - assertEquals(StatementImpl.parseStatementType("delete from test_table where num = 2"), StatementImpl.StatementType.DELETE); - assertEquals(StatementImpl.parseStatementType("sElEcT 1 AS num"), StatementImpl.StatementType.SELECT); - assertEquals(StatementImpl.parseStatementType(null), StatementImpl.StatementType.OTHER); - assertEquals(StatementImpl.parseStatementType(""), StatementImpl.StatementType.OTHER); - assertEquals(StatementImpl.parseStatementType(" "), StatementImpl.StatementType.OTHER); - } - - @Test(groups = { "integration" }) - public void testParseStatementWithClause() throws Exception { - assertEquals(StatementImpl.parseStatementType("with data as (SELECT number FROM numbers(100)) select * from data"), StatementImpl.StatementType.SELECT); - } - @Test(groups = { "integration" }) public void testWithIPs() throws Exception { diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/internal/JdbcUtilsTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/internal/JdbcUtilsTest.java index 009bff49d..06601c2e6 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/internal/JdbcUtilsTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/internal/JdbcUtilsTest.java @@ -59,6 +59,16 @@ public void testEscapeQuotes() { } } + @Test + public void testUnQuoteTableName() { + String[] names = new String[]{"test", "`test name1`", "\"test name 2\""}; + String[] expected = new String[]{"test", "test name1", "test name 2"}; + + for (int i = 0; i < names.length; i++) { + assertEquals(JdbcUtils.unQuoteTableName(names[i]), expected[i]); + } + } + @Test(dataProvider = "testReplaceQuestionMark_dataProvider") public void testReplaceQuestionMark(String sql, String result) { assertEquals(JdbcUtils.replaceQuestionMarks(sql, "NULL"), result); diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/internal/StatementParserTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/internal/StatementParserTest.java new file mode 100644 index 000000000..1cbcc6836 --- /dev/null +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/internal/StatementParserTest.java @@ -0,0 +1,40 @@ +package com.clickhouse.jdbc.internal; + + +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class StatementParserTest { + + @Test(groups = {"integration"}) + public void testWithComments() throws Exception { + assertEquals(StatementParser.parseStatementType(" /* INSERT TESTING */\n SELECT 1 AS num").getType(), StatementParser.StatementType.SELECT); + assertEquals(StatementParser.parseStatementType("/* SELECT TESTING */\n INSERT INTO test_table VALUES (1)").getType(), StatementParser.StatementType.INSERT); + assertEquals(StatementParser.parseStatementType("/* INSERT TESTING */\n\n\n UPDATE test_table SET num = 2").getType(), StatementParser.StatementType.UPDATE); + assertEquals(StatementParser.parseStatementType("-- INSERT TESTING */\n SELECT 1 AS num").getType(), StatementParser.StatementType.SELECT); + assertEquals(StatementParser.parseStatementType(" -- SELECT TESTING \n -- SELECT AGAIN \n INSERT INTO test_table VALUES (1)").getType(), StatementParser.StatementType.INSERT); + assertEquals(StatementParser.parseStatementType(" SELECT 42 -- INSERT TESTING").getType(), StatementParser.StatementType.SELECT); + assertEquals(StatementParser.parseStatementType("#! INSERT TESTING \n SELECT 1 AS num").getType(), StatementParser.StatementType.SELECT); + assertEquals(StatementParser.parseStatementType("#!INSERT TESTING \n SELECT 1 AS num").getType(), StatementParser.StatementType.SELECT); + assertEquals(StatementParser.parseStatementType("# INSERT TESTING \n SELECT 1 AS num").getType(), StatementParser.StatementType.SELECT); + assertEquals(StatementParser.parseStatementType("#INSERT TESTING \n SELECT 1 AS num").getType(), StatementParser.StatementType.SELECT); + assertEquals(StatementParser.parseStatementType("\nINSERT TESTING \n SELECT 1 AS num").getType(), StatementParser.StatementType.INSERT_INTO_SELECT); + assertEquals(StatementParser.parseStatementType(" \n INSERT TESTING \n SELECT 1 AS num").getType(), StatementParser.StatementType.INSERT_INTO_SELECT); + assertEquals(StatementParser.parseStatementType("INSERT INTO t SELECT 1 AS num").getType(), StatementParser.StatementType.INSERT_INTO_SELECT); + assertEquals(StatementParser.parseStatementType("select 1 AS num").getType(), StatementParser.StatementType.SELECT); + assertEquals(StatementParser.parseStatementType("insert into test_table values (1)").getType(), StatementParser.StatementType.INSERT); + assertEquals(StatementParser.parseStatementType("update test_table set num = 2").getType(), StatementParser.StatementType.UPDATE); + assertEquals(StatementParser.parseStatementType("delete from test_table where num = 2").getType(), StatementParser.StatementType.DELETE); + assertEquals(StatementParser.parseStatementType("sElEcT 1 AS num").getType(), StatementParser.StatementType.SELECT); + assertEquals(StatementParser.parseStatementType(null).getType(), StatementParser.StatementType.OTHER); + assertEquals(StatementParser.parseStatementType("").getType(), StatementParser.StatementType.OTHER); + assertEquals(StatementParser.parseStatementType(" ").getType(), StatementParser.StatementType.OTHER); + } + + @Test(groups = {"integration"}) + public void testParseStatementWithClause() throws Exception { + assertEquals(StatementParser.parseStatementType("with data as (SELECT number FROM numbers(100)) select * from data").getType(), StatementParser.StatementType.SELECT); + } + +} \ No newline at end of file diff --git a/performance/README.md b/performance/README.md index 830d28d83..ce0f72e17 100644 --- a/performance/README.md +++ b/performance/README.md @@ -43,4 +43,5 @@ Other options: - "writer" - Serializer - serialization only logic benchmarks - "reader" - DeSerilalizer - deserialization only logic benchmarks - "mixed" - MixedWorkload - \ No newline at end of file + - "jq" - JDBCQuery - query operations using JDBC + - "ji" - JDBCInsert - insert operation using JDBC \ No newline at end of file diff --git a/performance/src/main/java/com/clickhouse/benchmark/BenchmarkRunner.java b/performance/src/main/java/com/clickhouse/benchmark/BenchmarkRunner.java index 66159a8ca..e8095a361 100644 --- a/performance/src/main/java/com/clickhouse/benchmark/BenchmarkRunner.java +++ b/performance/src/main/java/com/clickhouse/benchmark/BenchmarkRunner.java @@ -95,6 +95,8 @@ private static Map buildBenchmarkFlags() { map.put("reader", Deserializers.class.getName()); map.put("writer", Serializers.class.getName()); map.put("mixed", MixedWorkload.class.getName()); + map.put("jq", JDBCQuery.class.getName()); + map.put("ji", JDBCInsert.class.getName()); return map; } diff --git a/performance/src/main/java/com/clickhouse/benchmark/clients/BenchmarkBase.java b/performance/src/main/java/com/clickhouse/benchmark/clients/BenchmarkBase.java index 4971819dd..d0742859e 100644 --- a/performance/src/main/java/com/clickhouse/benchmark/clients/BenchmarkBase.java +++ b/performance/src/main/java/com/clickhouse/benchmark/clients/BenchmarkBase.java @@ -4,18 +4,31 @@ import com.clickhouse.benchmark.data.FileDataSet; import com.clickhouse.benchmark.data.SimpleDataSet; import com.clickhouse.benchmark.data.SyntheticDataSet; -import com.clickhouse.client.*; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseNodeSelector; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.insert.InsertResponse; import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.config.ClickHouseDefaults; import com.clickhouse.data.ClickHouseDataProcessor; import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.data.ClickHouseOutputStream; import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.data.format.ClickHouseRowBinaryProcessor; import com.clickhouse.jdbc.ClickHouseDriver; -import org.openjdk.jmh.annotations.*; +import com.clickhouse.jdbc.internal.DriverProperties; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +43,13 @@ import java.util.List; import java.util.Properties; -import static com.clickhouse.benchmark.TestEnvironment.*; +import static com.clickhouse.benchmark.TestEnvironment.DB_NAME; +import static com.clickhouse.benchmark.TestEnvironment.cleanupEnvironment; +import static com.clickhouse.benchmark.TestEnvironment.getPassword; +import static com.clickhouse.benchmark.TestEnvironment.getServer; +import static com.clickhouse.benchmark.TestEnvironment.getUsername; +import static com.clickhouse.benchmark.TestEnvironment.isCloud; +import static com.clickhouse.benchmark.TestEnvironment.setupEnvironment; @State(Scope.Benchmark) public class BenchmarkBase { @@ -264,8 +283,9 @@ private static String jdbcURLV2(boolean isCloud) { protected static Connection getJdbcV1() { Properties properties = new Properties(); - properties.put("user", getUsername()); - properties.put("password", getPassword()); + properties.put(ClickHouseDefaults.USER.getKey(), getUsername()); + properties.put(ClickHouseDefaults.PASSWORD.getKey(), getPassword()); + properties.put(ClickHouseDefaults.DATABASE.getKey(), DB_NAME); Connection jdbcV1 = null; String jdbcURL = jdbcURLV1(isCloud()); @@ -280,8 +300,10 @@ protected static Connection getJdbcV1() { protected static Connection getJdbcV2() { Properties properties = new Properties(); - properties.put("user", getUsername()); - properties.put("password", getPassword()); + properties.put(ClientConfigProperties.USER.getKey(), getUsername()); + properties.put(ClientConfigProperties.PASSWORD.getKey(), getPassword()); + properties.put(DriverProperties.BETA_ROW_BINARY_WRITER.getKey(), "true"); + properties.put(ClientConfigProperties.DATABASE.getKey(), DB_NAME); Connection jdbcV2 = null; String jdbcURL = jdbcURLV2(isCloud()); diff --git a/performance/src/main/java/com/clickhouse/benchmark/clients/JDBCInsert.java b/performance/src/main/java/com/clickhouse/benchmark/clients/JDBCInsert.java index 51916742c..c74fd2c6b 100644 --- a/performance/src/main/java/com/clickhouse/benchmark/clients/JDBCInsert.java +++ b/performance/src/main/java/com/clickhouse/benchmark/clients/JDBCInsert.java @@ -12,8 +12,6 @@ import java.util.List; import java.util.stream.Collectors; -import static com.clickhouse.benchmark.TestEnvironment.DB_NAME; - public class JDBCInsert extends BenchmarkBase { private static final Logger LOGGER = LoggerFactory.getLogger(JDBCInsert.class); @TearDown(Level.Invocation) @@ -39,9 +37,8 @@ public void verifyRowsInsertedAndCleanup(DataState dataState) { } void insetData(Connection connection, DataState dataState) throws SQLException { int size = dataState.dataSet.getSchema().getColumns().size(); - String names = dataState.dataSet.getSchema().getColumns().stream().map(column -> column.getColumnName()).collect(Collectors.joining(",")); String values = dataState.dataSet.getSchema().getColumns().stream().map(column -> "?").collect(Collectors.joining(",")); - String sql = String.format("INSERT INTO `%s`.`%s` (%s) VALUES (%s)", DB_NAME ,dataState.tableNameEmpty, names, values); + String sql = String.format("INSERT INTO `%s` VALUES (%s)", dataState.tableNameEmpty, values); LOGGER.info("SQL: " + sql); PreparedStatement preparedStatement = connection.prepareStatement(sql); for (List data : dataState.dataSet.getRowsOrdered()) {