Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ sth
.write() // Write API entrypoint
.table("default.my_table") // where to write data
.option("format_csv_delimiter", ";") // specific param
.data(new File("/path/to/file.csv"), ClickHouseFormat.CSV) // specify input
.data(new File("/path/to/file.csv.gz"), ClickHouseFormat.CSV, ClickHouseCompression.gzip) // specify input
.send();
```
#### Configurable send
Expand All @@ -46,6 +46,7 @@ sth
.write()
.sql("INSERT INTO default.my_table (a,b,c)")
.data(new MyCustomInputStream(), ClickHouseFormat.JSONEachRow)
.dataCompression(ClickHouseCompression.brotli)
.addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, 2)
.send();
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,9 @@ void sendStream(Writer writer, HttpEntity content) throws ClickHouseException {

HttpPost httpPost = new HttpPost(uri);

if (writer.getCompression() != null) {
httpPost.addHeader("Content-Encoding", writer.getCompression().name());
}
httpPost.setEntity(content);
HttpResponse response = client.execute(httpPost);
entity = response.getEntity();
Expand Down
23 changes: 22 additions & 1 deletion src/main/java/ru/yandex/clickhouse/Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.http.HttpEntity;
import org.apache.http.entity.InputStreamEntity;
import ru.yandex.clickhouse.domain.ClickHouseCompression;
import ru.yandex.clickhouse.domain.ClickHouseFormat;
import ru.yandex.clickhouse.util.ClickHouseStreamCallback;
import ru.yandex.clickhouse.util.ClickHouseStreamHttpEntity;
Expand All @@ -17,7 +18,7 @@
public class Writer extends ConfigurableApi<Writer> {

private ClickHouseFormat format = TabSeparated;

private ClickHouseCompression compression = null;
private String table = null;
private String sql = null;
private InputStreamProvider streamProvider = null;
Expand Down Expand Up @@ -81,6 +82,22 @@ public Writer data(File input) {
return this;
}

public Writer data(InputStream stream, ClickHouseFormat format, ClickHouseCompression compression) {
return dataCompression(compression).format(format).data(stream);
}

public Writer data(File input, ClickHouseFormat format, ClickHouseCompression compression) {
return dataCompression(compression).format(format).data(input);
}

public Writer dataCompression(ClickHouseCompression compression) {
if (null == compression) {
throw new NullPointerException("Compression can not be null");
}
this.compression = compression;
return this;
}

public Writer data(File input, ClickHouseFormat format) {
return format(format).data(input);
}
Expand Down Expand Up @@ -184,4 +201,8 @@ public InputStream get() throws IOException {
return stream;
}
}

public ClickHouseCompression getCompression() {
return compression;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ru.yandex.clickhouse.domain;

public enum ClickHouseCompression {
none,
gzip,
brotli,
deflate,
zstd;
}
163 changes: 146 additions & 17 deletions src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseContainerForTest;
import ru.yandex.clickhouse.ClickHouseDataSource;

import ru.yandex.clickhouse.domain.ClickHouseCompression;
import ru.yandex.clickhouse.domain.ClickHouseFormat;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.io.*;
import java.nio.charset.Charset;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.zip.GZIPOutputStream;

public class StreamSQLTest {
private ClickHouseDataSource dataSource;
Expand All @@ -33,7 +36,11 @@ public void simpleCSVInsert() throws SQLException {
String string = "5,6\n1,6";
InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8")));

connection.createStatement().sendStreamSQL(inputStream, "insert into test.csv_stream_sql format CSV");
connection.createStatement().
write()
.sql("insert into test.csv_stream_sql format CSV")
.data(inputStream)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.csv_stream_sql");
Expand All @@ -43,31 +50,21 @@ public void simpleCSVInsert() throws SQLException {
Assert.assertEquals(rs.getLong("uniq"), 1);
}

@Test
public void multiRowTSVInsert() throws SQLException {
connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_stream_sql");
connection.createStatement().execute(
"CREATE TABLE test.tsv_stream_sql (value Int32, string_value String) ENGINE = Log()"
);


final int rowsCount = 100000;

InputStream in = new InputStream() {
private InputStream getTSVStream(final int rowsCount) {
return new InputStream() {
private int si = 0;
private String s = "";
private int i = 0;
private final int count = rowsCount;

private boolean genNextString() {
if (i >= count) return false;
if (i >= rowsCount) return false;
si = 0;
s = String.format("%d\txxxx%d\n", 1, i);
i++;
return true;
}

public int read() throws IOException {
public int read() {
if (si >= s.length()) {
if ( ! genNextString() ) {
return -1;
Expand All @@ -76,8 +73,22 @@ public int read() throws IOException {
return s.charAt( si++ );
}
};
}

@Test
public void multiRowTSVInsert() throws SQLException {
connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_stream_sql");
connection.createStatement().execute(
"CREATE TABLE test.tsv_stream_sql (value Int32, string_value String) ENGINE = Log()"
);

final int rowsCount = 100000;

connection.createStatement().sendStreamSQL(in, "insert into test.tsv_stream_sql format TSV");
connection.createStatement().
write()
.sql("insert into test.tsv_stream_sql format TSV")
.data(getTSVStream(rowsCount), ClickHouseFormat.TSV)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.tsv_stream_sql");
Expand All @@ -87,4 +98,122 @@ public int read() throws IOException {
Assert.assertEquals(rs.getInt("uniq"), rowsCount);
}

private InputStream gzStream( InputStream is ) throws IOException
{
final int bufferSize = 16384;
byte data[] = new byte[bufferSize];
ByteArrayOutputStream os = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(os);
BufferedInputStream es = new BufferedInputStream(is, bufferSize);
int count;
while ( ( count = es.read( data, 0, bufferSize) ) != -1 )
gzipOutputStream.write( data, 0, count );
es.close();
gzipOutputStream.close();

return new ByteArrayInputStream( os.toByteArray() );
}

@Test
public void multiRowTSVInsertCompressed() throws SQLException, IOException {
connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_compressed_stream_sql");
connection.createStatement().execute(
"CREATE TABLE test.tsv_compressed_stream_sql (value Int32, string_value String) ENGINE = Log()"
);

final int rowsCount = 100000;

InputStream gz = gzStream(getTSVStream(rowsCount));
connection.createStatement().
write()
.sql("insert into test.tsv_compressed_stream_sql format TSV")
.data(gz, ClickHouseFormat.TSV, ClickHouseCompression.gzip)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.tsv_compressed_stream_sql");
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt("cnt"), rowsCount);
Assert.assertEquals(rs.getInt("sum"), rowsCount);
Assert.assertEquals(rs.getInt("uniq"), rowsCount);
}


@Test
public void JSONEachRowInsert() throws SQLException {
connection.createStatement().execute("DROP TABLE IF EXISTS test.json_stream_sql");
connection.createStatement().execute(
"CREATE TABLE test.json_stream_sql (value Int32, string_value String) ENGINE = Log()"
);

String string = "{\"value\":5,\"string_value\":\"6\"}\n{\"value\":1,\"string_value\":\"6\"}";
InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8")));

connection.createStatement().
write()
.sql("insert into test.json_stream_sql")
.data(inputStream, ClickHouseFormat.JSONEachRow)
.data(inputStream)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.json_stream_sql");
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt("cnt"), 2);
Assert.assertEquals(rs.getLong("sum"), 6);
Assert.assertEquals(rs.getLong("uniq"), 1);
}

@Test
public void JSONEachRowCompressedInsert() throws SQLException, IOException {
connection.createStatement().execute("DROP TABLE IF EXISTS test.json_comressed_stream_sql");
connection.createStatement().execute(
"CREATE TABLE test.json_comressed_stream_sql (value Int32, string_value String) ENGINE = Log()"
);

String string = "{\"value\":5,\"string_value\":\"6\"}\n{\"value\":1,\"string_value\":\"6\"}";
InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8")));

connection.createStatement().
write()
.sql("insert into test.json_comressed_stream_sql")
.data(inputStream, ClickHouseFormat.JSONEachRow)
.data(gzStream(inputStream))
.dataCompression(ClickHouseCompression.gzip)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.json_comressed_stream_sql");
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt("cnt"), 2);
Assert.assertEquals(rs.getLong("sum"), 6);
Assert.assertEquals(rs.getLong("uniq"), 1);
}

@Test
public void CSVInsertCompressedIntoTable() throws SQLException, IOException {
connection.createStatement().execute("DROP TABLE IF EXISTS test.csv_stream_compressed");
connection.createStatement().execute(
"CREATE TABLE test.csv_stream_compressed (value Int32, string_value String) ENGINE = Log()"
);

String string = "5,6\n1,6";
InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8")));

connection.createStatement().
write()
.table("test.csv_stream_compressed")
.format(ClickHouseFormat.CSV)
.dataCompression(ClickHouseCompression.gzip)
.data(gzStream(inputStream))
.send();

ResultSet rs = connection.createStatement().executeQuery(
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.csv_stream_compressed");
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt("cnt"), 2);
Assert.assertEquals(rs.getLong("sum"), 6);
Assert.assertEquals(rs.getLong("uniq"), 1);
}

}