Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -22,10 +23,12 @@
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
Expand Down Expand Up @@ -57,6 +60,50 @@ public class ClickHouseStatementImpl extends ConfigurableApi<ClickHouseStatement

private static final Logger log = LoggerFactory.getLogger(ClickHouseStatementImpl.class);

protected static class WrappedHttpEntity extends AbstractHttpEntity {
private final String sql;
private final HttpEntity entity;

public WrappedHttpEntity(String sql, HttpEntity entity) {
this.sql = sql;
this.entity = Objects.requireNonNull(entity);

this.chunked = entity.isChunked();
this.contentEncoding = entity.getContentEncoding();
this.contentType = entity.getContentType();
}

@Override
public boolean isRepeatable() {
return entity.isRepeatable();
}

@Override
public long getContentLength() {
return entity.getContentLength();
}

@Override
public InputStream getContent() throws IOException, IllegalStateException {
return entity.getContent();
}

@Override
public void writeTo(OutputStream outputStream) throws IOException {
if (sql != null && !sql.isEmpty()) {
outputStream.write(sql.getBytes(StandardCharsets.UTF_8));
outputStream.write('\n');
}

entity.writeTo(outputStream);
}

@Override
public boolean isStreaming() {
return entity.isStreaming();
}
}

private final CloseableHttpClient client;

private final HttpClientContext httpContext;
Expand Down Expand Up @@ -820,7 +867,7 @@ private List<NameValuePair> getUrlQueryParams(
) {
List<NameValuePair> result = new ArrayList<>();

if (sql != null) {
if (sql != null && !sql.isEmpty()) {
result.add(new BasicNameValuePair("query", sql));
}

Expand Down Expand Up @@ -1020,10 +1067,14 @@ void sendStream(Writer writer, HttpEntity content) throws ClickHouseException {
HttpEntity entity = null;
// TODO no parser involved so user can execute arbitray statement here
try {
URI uri = buildRequestUri(writer.getSql(), null, writer.getAdditionalDBParams(), writer.getRequestParams(), false);
String sql = writer.getSql();
boolean isContentCompressed = writer.getCompression() != ClickHouseCompression.none;
URI uri = buildRequestUri(
isContentCompressed ? sql : null, null, writer.getAdditionalDBParams(), writer.getRequestParams(), false);
uri = followRedirects(uri);

content = applyRequestBodyCompression(content);
content = applyRequestBodyCompression(
new WrappedHttpEntity(isContentCompressed ? null : sql, content));

HttpPost httpPost = new HttpPost(uri);

Expand All @@ -1050,7 +1101,8 @@ void sendStream(Writer writer, HttpEntity content) throws ClickHouseException {
}

private void checkForErrorAndThrow(HttpEntity entity, HttpResponse response) throws IOException, ClickHouseException {
if (response.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) {
StatusLine line = response.getStatusLine();
if (line.getStatusCode() != HttpURLConnection.HTTP_OK) {
InputStream messageStream = entity.getContent();
byte[] bytes = Utils.toByteArray(messageStream);
if (properties.isCompress()) {
Expand All @@ -1062,8 +1114,11 @@ private void checkForErrorAndThrow(HttpEntity entity, HttpResponse response) thr
}
}
EntityUtils.consumeQuietly(entity);
String chMessage = new String(bytes, StandardCharsets.UTF_8);
throw ClickHouseExceptionSpecifier.specify(chMessage, properties.getHost(), properties.getPort());
if (bytes.length == 0) {
throw ClickHouseExceptionSpecifier.specify(new IllegalStateException(line.toString()), properties.getHost(), properties.getPort());
} else {
throw ClickHouseExceptionSpecifier.specify(new String(bytes, StandardCharsets.UTF_8), properties.getHost(), properties.getPort());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.testng.Assert;
import org.testng.annotations.AfterTest;
Expand Down Expand Up @@ -289,4 +292,28 @@ public void testNullParameters() throws SQLException {
st.addBatch();
}

@Test
public void testBatchInsertWithLongQuery() throws SQLException {
int columnCount = 200;
try (Statement s = connection.createStatement()) {
String createColumns = IntStream.range(0, columnCount).mapToObj(
i -> "`looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongnaaaaameeeeeeee" + i + "` String "
).collect(Collectors.joining(","));
s.execute("DROP TABLE IF EXISTS test.batch_insert_with_long_query");
s.execute("CREATE TABLE test.batch_insert_with_long_query (" + createColumns + ") ENGINE = Memory");
}

String values = IntStream.range(0, columnCount).mapToObj(i -> "?").collect(Collectors.joining(","));
String columns = IntStream.range(0, columnCount).mapToObj(
i -> "looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongnaaaaameeeeeeee" + i
).collect(Collectors.joining(","));
int index = 1;
try (PreparedStatement s = connection.prepareStatement("INSERT INTO test.batch_insert_with_long_query (" + columns + ") VALUES (" + values + ")")) {
for (int i = 0; i < columnCount; i++) {
s.setString(index++, "12345");
}
s.addBatch();
s.executeBatch();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,40 @@ public void testExternalData() throws SQLException, UnsupportedEncodingException
}
}

// reproduce issue #634
public void testLargeQueryWithExternalData() throws Exception {
String serverVersion = connection.getServerVersion();
String[] rows = ClickHouseVersionNumberUtil.getMajorVersion(serverVersion) >= 21
&& ClickHouseVersionNumberUtil.getMinorVersion(serverVersion) >= 3
? new String[] { "1\tGroup\n" }
: new String[] { "1\tGroup", "1\tGroup\n" };

int length = 160000;
StringBuilder builder = new StringBuilder(length);
for (int i = 0; i < length; i++) {
builder.append('u');
}
String user = builder.toString();
for (String row : rows) {
try (ClickHouseStatement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(
"select UserName, GroupName from (select '"
+ user
+ "' as UserName, 1 as GroupId) as g"
+ "any left join groups using GroupId", null,
Collections.singletonList(new ClickHouseExternalData(
"groups", new ByteArrayInputStream(row.getBytes())
).withStructure("GroupId UInt8, GroupName String")))) {
Assert.assertTrue(rs.next());
String userName = rs.getString("UserName");
String groupName = rs.getString("GroupName");

Assert.assertEquals(userName, user);
Assert.assertEquals(groupName, "Group");
}
}
}


private InputStream getTSVStream(final int rowsCount) {
return new InputStream() {
Expand Down