Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 69 additions & 61 deletions sql/src/main/java/io/druid/sql/http/SqlResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.ISE;
Expand Down Expand Up @@ -84,6 +85,74 @@ public Response doPost(final SqlQuery sqlQuery) throws SQLException, IOException
try (final DruidPlanner planner = plannerFactory.createPlanner(sqlQuery.getContext())) {
plannerResult = planner.plan(sqlQuery.getQuery());
timeZone = planner.getPlannerContext().getTimeZone();

// Remember which columns are time-typed, so we can emit ISO8601 instead of millis values.
final List<RelDataTypeField> fieldList = plannerResult.rowType().getFieldList();
final boolean[] timeColumns = new boolean[fieldList.size()];
final boolean[] dateColumns = new boolean[fieldList.size()];
for (int i = 0; i < fieldList.size(); i++) {
final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName();
timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP;
dateColumns[i] = sqlTypeName == SqlTypeName.DATE;
}

final Yielder<Object[]> yielder0 = Yielders.each(plannerResult.run());

try {
return Response.ok(
new StreamingOutput()
{
@Override
public void write(final OutputStream outputStream) throws IOException, WebApplicationException
{
Yielder<Object[]> yielder = yielder0;

try (final JsonGenerator jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream)) {
jsonGenerator.writeStartArray();

while (!yielder.isDone()) {
final Object[] row = yielder.get();
jsonGenerator.writeStartObject();
for (int i = 0; i < fieldList.size(); i++) {
final Object value;

if (timeColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteTimestampToJoda((long) row[i], timeZone)
);
} else if (dateColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteDateToJoda((int) row[i], timeZone)
);
} else {
value = row[i];
}

jsonGenerator.writeObjectField(fieldList.get(i).getName(), value);
}
jsonGenerator.writeEndObject();
yielder = yielder.next(null);
}

jsonGenerator.writeEndArray();
jsonGenerator.flush();

// End with CRLF
outputStream.write('\r');
outputStream.write('\n');
}
finally {
yielder.close();
}
}
}
).build();
}
catch (Throwable e) {
// make sure to close yielder if anything happened before starting to serialize the response.
yielder0.close();
throw Throwables.propagate(e);
}
}
catch (Exception e) {
log.warn(e, "Failed to handle query: %s", sqlQuery);
Expand All @@ -101,66 +170,5 @@ public Response doPost(final SqlQuery sqlQuery) throws SQLException, IOException
.entity(jsonMapper.writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(exceptionToReport)))
.build();
}

// Remember which columns are time-typed, so we can emit ISO8601 instead of millis values.
final List<RelDataTypeField> fieldList = plannerResult.rowType().getFieldList();
final boolean[] timeColumns = new boolean[fieldList.size()];
final boolean[] dateColumns = new boolean[fieldList.size()];
for (int i = 0; i < fieldList.size(); i++) {
final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName();
timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP;
dateColumns[i] = sqlTypeName == SqlTypeName.DATE;
}

final Yielder<Object[]> yielder0 = Yielders.each(plannerResult.run());

return Response.ok(
new StreamingOutput()
{
@Override
public void write(final OutputStream outputStream) throws IOException, WebApplicationException
{
Yielder<Object[]> yielder = yielder0;

try (final JsonGenerator jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream)) {
jsonGenerator.writeStartArray();

while (!yielder.isDone()) {
final Object[] row = yielder.get();
jsonGenerator.writeStartObject();
for (int i = 0; i < fieldList.size(); i++) {
final Object value;

if (timeColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteTimestampToJoda((long) row[i], timeZone)
);
} else if (dateColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteDateToJoda((int) row[i], timeZone)
);
} else {
value = row[i];
}

jsonGenerator.writeObjectField(fieldList.get(i).getName(), value);
}
jsonGenerator.writeEndObject();
yielder = yielder.next(null);
}

jsonGenerator.writeEndArray();
jsonGenerator.flush();

// End with CRLF
outputStream.write('\r');
outputStream.write('\n');
}
finally {
yielder.close();
}
}
}
).build();
}
}
80 changes: 51 additions & 29 deletions sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.query.QueryInterruptedException;
import io.druid.query.ResourceLimitExceededException;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.PlannerConfig;
Expand All @@ -36,12 +39,12 @@
import io.druid.sql.http.SqlQuery;
import io.druid.sql.http.SqlResource;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.ValidationException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

import javax.ws.rs.core.Response;
Expand All @@ -54,9 +57,6 @@ public class SqlResourceTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

Expand Down Expand Up @@ -92,7 +92,7 @@ public void testCountStar() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT COUNT(*) AS cnt FROM druid.foo", null)
);
).rhs;

Assert.assertEquals(
ImmutableList.of(
Expand All @@ -107,7 +107,7 @@ public void testTimestampsInResponse() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1", null)
);
).rhs;

Assert.assertEquals(
ImmutableList.of(
Expand All @@ -125,7 +125,7 @@ public void testTimestampsInResponseLosAngelesTimeZone() throws Exception
"SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1",
ImmutableMap.<String, Object>of(PlannerContext.CTX_SQL_TIME_ZONE, "America/Los_Angeles")
)
);
).rhs;

Assert.assertEquals(
ImmutableList.of(
Expand All @@ -140,7 +140,7 @@ public void testFieldAliasingSelect() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", null)
);
).rhs;

Assert.assertEquals(
ImmutableList.of(
Expand All @@ -155,7 +155,7 @@ public void testFieldAliasingGroupBy() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", null)
);
).rhs;

Assert.assertEquals(
ImmutableList.of(
Expand All @@ -172,7 +172,7 @@ public void testExplainCountStar() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", null)
);
).rhs;

Assert.assertEquals(
ImmutableList.of(
Expand All @@ -188,43 +188,65 @@ public void testExplainCountStar() throws Exception
@Test
public void testCannotValidate() throws Exception
{
expectedException.expect(QueryInterruptedException.class);
expectedException.expectMessage("Column 'dim3' not found in any table");
final QueryInterruptedException exception = doPost(new SqlQuery("SELECT dim3 FROM druid.foo", null)).lhs;

doPost(
new SqlQuery("SELECT dim3 FROM druid.foo", null)
);

Assert.fail();
Assert.assertNotNull(exception);
Assert.assertEquals(QueryInterruptedException.UNKNOWN_EXCEPTION, exception.getErrorCode());
Assert.assertEquals(ValidationException.class.getName(), exception.getErrorClass());
Assert.assertTrue(exception.getMessage().contains("Column 'dim3' not found in any table"));
}

@Test
public void testCannotConvert() throws Exception
{
expectedException.expect(QueryInterruptedException.class);
expectedException.expectMessage("Cannot build plan for query: SELECT TRIM(dim1) FROM druid.foo");

// TRIM unsupported
doPost(new SqlQuery("SELECT TRIM(dim1) FROM druid.foo", null));
final QueryInterruptedException exception = doPost(new SqlQuery("SELECT TRIM(dim1) FROM druid.foo", null)).lhs;

Assert.fail();
Assert.assertNotNull(exception);
Assert.assertEquals(QueryInterruptedException.UNKNOWN_EXCEPTION, exception.getErrorCode());
Assert.assertEquals(ISE.class.getName(), exception.getErrorClass());
Assert.assertTrue(exception.getMessage().contains("Cannot build plan for query: SELECT TRIM(dim1) FROM druid.foo"));
}

private List<Map<String, Object>> doPost(final SqlQuery query) throws Exception
@Test
public void testResourceLimitExceeded() throws Exception
{
final QueryInterruptedException exception = doPost(
new SqlQuery(
"SELECT DISTINCT dim1 FROM foo",
ImmutableMap.<String, Object>of(
"maxMergingDictionarySize", 1
)
)
).lhs;

Assert.assertNotNull(exception);
Assert.assertEquals(exception.getErrorCode(), QueryInterruptedException.RESOURCE_LIMIT_EXCEEDED);
Assert.assertEquals(exception.getErrorClass(), ResourceLimitExceededException.class.getName());
}

// Returns either an error or a result.
private Pair<QueryInterruptedException, List<Map<String, Object>>> doPost(final SqlQuery query) throws Exception
{
final Response response = resource.doPost(query);
if (response.getStatus() == 200) {
final StreamingOutput output = (StreamingOutput) response.getEntity();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
output.write(baos);
return JSON_MAPPER.readValue(
baos.toByteArray(),
new TypeReference<List<Map<String, Object>>>()
{
}
return Pair.of(
null,
JSON_MAPPER.<List<Map<String, Object>>>readValue(
baos.toByteArray(),
new TypeReference<List<Map<String, Object>>>()
{
}
)
);
} else {
throw JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryInterruptedException.class);
return Pair.of(
JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryInterruptedException.class),
null
);
}
}
}