From 2eaa6f63b03c575b5b1f8cf4398ae2e53046d284 Mon Sep 17 00:00:00 2001 From: Denis Zhuravlev Date: Wed, 3 Feb 2021 16:47:33 -0400 Subject: [PATCH 1/2] ability to send compressed files/streams --- README.md | 3 +- .../clickhouse/ClickHouseStatementImpl.java | 3 + .../java/ru/yandex/clickhouse/Writer.java | 23 ++- .../domain/ClickHouseCompression.java | 9 + .../clickhouse/integration/StreamSQLTest.java | 164 ++++++++++++++++-- 5 files changed, 183 insertions(+), 19 deletions(-) create mode 100644 src/main/java/ru/yandex/clickhouse/domain/ClickHouseCompression.java diff --git a/README.md b/README.md index 870ff57eb..ff31548bc 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ sth .write() // Write API entrypoint .table("default.my_table") // where to write data .option("format_csv_delimiter", ";") // specific param - .data(new File("/path/to/file.csv"), ClickHouseFormat.CSV) // specify input + .data(new File("/path/to/file.csv.gz"), ClickHouseFormat.CSV, ClickHouseCompression.gzip) // specify input .send(); ``` #### Configurable send @@ -46,6 +46,7 @@ sth .write() .sql("INSERT INTO default.my_table (a,b,c)") .data(new MyCustomInputStream(), ClickHouseFormat.JSONEachRow) + .dataCompression(ClickHouseCompression.brotli) .addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, 2) .send(); ``` diff --git a/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java b/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java index a6ce8afc1..19bcc402a 100644 --- a/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java +++ b/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java @@ -892,6 +892,9 @@ void sendStream(Writer writer, HttpEntity content) throws ClickHouseException { HttpPost httpPost = new HttpPost(uri); + if (writer.getCompression() != null) { + httpPost.addHeader("Content-Encoding", writer.getCompression().name()); + } httpPost.setEntity(content); HttpResponse response = client.execute(httpPost); entity = response.getEntity(); diff --git a/src/main/java/ru/yandex/clickhouse/Writer.java b/src/main/java/ru/yandex/clickhouse/Writer.java index f61b643b9..a93ea8fa6 100644 --- a/src/main/java/ru/yandex/clickhouse/Writer.java +++ b/src/main/java/ru/yandex/clickhouse/Writer.java @@ -2,6 +2,7 @@ import org.apache.http.HttpEntity; import org.apache.http.entity.InputStreamEntity; +import ru.yandex.clickhouse.domain.ClickHouseCompression; import ru.yandex.clickhouse.domain.ClickHouseFormat; import ru.yandex.clickhouse.util.ClickHouseStreamCallback; import ru.yandex.clickhouse.util.ClickHouseStreamHttpEntity; @@ -17,7 +18,7 @@ public class Writer extends ConfigurableApi { private ClickHouseFormat format = TabSeparated; - + private ClickHouseCompression compression = null; private String table = null; private String sql = null; private InputStreamProvider streamProvider = null; @@ -81,6 +82,22 @@ public Writer data(File input) { return this; } + public Writer data(InputStream stream, ClickHouseFormat format, ClickHouseCompression compression) { + return dataCompression(compression).format(format).data(stream); + } + + public Writer data(File input, ClickHouseFormat format, ClickHouseCompression compression) { + return dataCompression(compression).format(format).data(input); + } + + public Writer dataCompression(ClickHouseCompression compression) { + if (null == compression) { + throw new NullPointerException("Compression can not be null"); + } + this.compression = compression; + return this; + } + public Writer data(File input, ClickHouseFormat format) { return format(format).data(input); } @@ -184,4 +201,8 @@ public InputStream get() throws IOException { return stream; } } + + public ClickHouseCompression getCompression() { + return compression; + } } diff --git a/src/main/java/ru/yandex/clickhouse/domain/ClickHouseCompression.java b/src/main/java/ru/yandex/clickhouse/domain/ClickHouseCompression.java new file mode 100644 index 000000000..d9269d631 --- /dev/null +++ b/src/main/java/ru/yandex/clickhouse/domain/ClickHouseCompression.java @@ -0,0 +1,9 @@ +package ru.yandex.clickhouse.domain; + +public enum ClickHouseCompression { + none, + gzip, + brotli, + deflate, + zstd; +} diff --git a/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java b/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java index 6975c9e12..ccfe1d2a9 100644 --- a/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java +++ b/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java @@ -6,11 +6,14 @@ import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseContainerForTest; import ru.yandex.clickhouse.ClickHouseDataSource; - +import ru.yandex.clickhouse.domain.ClickHouseCompression; +import ru.yandex.clickhouse.domain.ClickHouseFormat; +import ru.yandex.clickhouse.settings.ClickHouseProperties; import java.io.*; import java.nio.charset.Charset; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.zip.GZIPOutputStream; public class StreamSQLTest { private ClickHouseDataSource dataSource; @@ -33,7 +36,11 @@ public void simpleCSVInsert() throws SQLException { String string = "5,6\n1,6"; InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8"))); - connection.createStatement().sendStreamSQL(inputStream, "insert into test.csv_stream_sql format CSV"); + connection.createStatement(). + write() + .sql("insert into test.csv_stream_sql format CSV") + .data(inputStream) + .send(); ResultSet rs = connection.createStatement().executeQuery( "SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.csv_stream_sql"); @@ -43,31 +50,21 @@ public void simpleCSVInsert() throws SQLException { Assert.assertEquals(rs.getLong("uniq"), 1); } - @Test - public void multiRowTSVInsert() throws SQLException { - connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_stream_sql"); - connection.createStatement().execute( - "CREATE TABLE test.tsv_stream_sql (value Int32, string_value String) ENGINE = Log()" - ); - - - final int rowsCount = 100000; - - InputStream in = new InputStream() { + private InputStream getTSVStream(final int rowsCount) { + return new InputStream() { private int si = 0; private String s = ""; private int i = 0; - private final int count = rowsCount; private boolean genNextString() { - if (i >= count) return false; + if (i >= rowsCount) return false; si = 0; s = String.format("%d\txxxx%d\n", 1, i); i++; return true; } - public int read() throws IOException { + public int read() { if (si >= s.length()) { if ( ! genNextString() ) { return -1; @@ -76,8 +73,22 @@ public int read() throws IOException { return s.charAt( si++ ); } }; + } + + @Test + public void multiRowTSVInsert() throws SQLException { + connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_stream_sql"); + connection.createStatement().execute( + "CREATE TABLE test.tsv_stream_sql (value Int32, string_value String) ENGINE = Log()" + ); + + final int rowsCount = 100000; - connection.createStatement().sendStreamSQL(in, "insert into test.tsv_stream_sql format TSV"); + connection.createStatement(). + write() + .sql("insert into test.tsv_stream_sql format TSV") + .data(getTSVStream(rowsCount), ClickHouseFormat.TSV) + .send(); ResultSet rs = connection.createStatement().executeQuery( "SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.tsv_stream_sql"); @@ -87,4 +98,123 @@ public int read() throws IOException { Assert.assertEquals(rs.getInt("uniq"), rowsCount); } + private InputStream gzStream( InputStream is ) throws IOException + { + final int bufferSize = 16384; + byte data[] = new byte[bufferSize]; + ByteArrayOutputStream os = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(os); + BufferedInputStream es = new BufferedInputStream(is, bufferSize); + int count; + while ( ( count = es.read( data, 0, bufferSize) ) != -1 ) + gzipOutputStream.write( data, 0, count ); + es.close(); + gzipOutputStream.close(); + + return new ByteArrayInputStream( os.toByteArray() ); + } + + @Test + public void multiRowTSVInsertCompressed() throws SQLException, IOException { + connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_compressed_stream_sql"); + connection.createStatement().execute( + "CREATE TABLE test.tsv_compressed_stream_sql (value Int32, string_value String) ENGINE = Log()" + ); + + final int rowsCount = 100000; + + InputStream gz = gzStream(getTSVStream(rowsCount)); + connection.createStatement(). + write() + .sql("insert into test.tsv_compressed_stream_sql format TSV") + .data(gz, ClickHouseFormat.TSV, ClickHouseCompression.gzip) + .send(); + + ResultSet rs = connection.createStatement().executeQuery( + "SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.tsv_compressed_stream_sql"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt("cnt"), rowsCount); + Assert.assertEquals(rs.getInt("sum"), rowsCount); + Assert.assertEquals(rs.getInt("uniq"), rowsCount); + } + + + @Test + public void JSONEachRowInsert() throws SQLException { + connection.createStatement().execute("DROP TABLE IF EXISTS test.json_stream_sql"); + connection.createStatement().execute( + "CREATE TABLE test.json_stream_sql (value Int32, string_value String) ENGINE = Log()" + ); + + String string = "{\"value\":5,\"string_value\":\"6\"}\n{\"value\":1,\"string_value\":\"6\"}"; + InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8"))); + + connection.createStatement(). + write() + .sql("insert into test.json_stream_sql") + .data(inputStream, ClickHouseFormat.JSONEachRow) + .data(inputStream) + .dataCompression(ClickHouseCompression.none) + .send(); + + ResultSet rs = connection.createStatement().executeQuery( + "SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.json_stream_sql"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt("cnt"), 2); + Assert.assertEquals(rs.getLong("sum"), 6); + Assert.assertEquals(rs.getLong("uniq"), 1); + } + + @Test + public void JSONEachRowCompressedInsert() throws SQLException, IOException { + connection.createStatement().execute("DROP TABLE IF EXISTS test.json_comressed_stream_sql"); + connection.createStatement().execute( + "CREATE TABLE test.json_comressed_stream_sql (value Int32, string_value String) ENGINE = Log()" + ); + + String string = "{\"value\":5,\"string_value\":\"6\"}\n{\"value\":1,\"string_value\":\"6\"}"; + InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8"))); + + connection.createStatement(). + write() + .sql("insert into test.json_comressed_stream_sql") + .data(inputStream, ClickHouseFormat.JSONEachRow) + .data(gzStream(inputStream)) + .dataCompression(ClickHouseCompression.gzip) + .send(); + + ResultSet rs = connection.createStatement().executeQuery( + "SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.json_comressed_stream_sql"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt("cnt"), 2); + Assert.assertEquals(rs.getLong("sum"), 6); + Assert.assertEquals(rs.getLong("uniq"), 1); + } + + @Test + public void CSVInsertCompressedIntoTable() throws SQLException, IOException { + connection.createStatement().execute("DROP TABLE IF EXISTS test.csv_stream_compressed"); + connection.createStatement().execute( + "CREATE TABLE test.csv_stream_compressed (value Int32, string_value String) ENGINE = Log()" + ); + + String string = "5,6\n1,6"; + InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8"))); + + connection.createStatement(). + write() + .table("test.csv_stream_compressed") + .format(ClickHouseFormat.CSV) + .dataCompression(ClickHouseCompression.gzip) + .data(gzStream(inputStream)) + .send(); + + ResultSet rs = connection.createStatement().executeQuery( + "SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.csv_stream_compressed"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt("cnt"), 2); + Assert.assertEquals(rs.getLong("sum"), 6); + Assert.assertEquals(rs.getLong("uniq"), 1); + } + } From 11072267a5bd3bc24a87eca40459c54ded1b12bc Mon Sep 17 00:00:00 2001 From: Denis Zhuravlev Date: Wed, 3 Feb 2021 16:54:21 -0400 Subject: [PATCH 2/2] fix test against 19.14 --- .../java/ru/yandex/clickhouse/integration/StreamSQLTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java b/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java index ccfe1d2a9..210e75f4e 100644 --- a/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java +++ b/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java @@ -154,7 +154,6 @@ public void JSONEachRowInsert() throws SQLException { .sql("insert into test.json_stream_sql") .data(inputStream, ClickHouseFormat.JSONEachRow) .data(inputStream) - .dataCompression(ClickHouseCompression.none) .send(); ResultSet rs = connection.createStatement().executeQuery(