Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions codestyle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
</Or>
</And>
</Match>
<Match>
<And>
<Bug pattern="SE_BAD_FIELD_STORE" />
<Class name="org.apache.druid.server.AsyncQueryForwardingServlet" />
</And>
</Match>

<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
<Bug pattern="BC_UNCONFIRMED_CAST"/>
Expand Down
8 changes: 8 additions & 0 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,14 @@ try (Connection connection = DriverManager.getConnection(url, connectionProperti
}
```

It is also possible to use a protocol buffers JDBC connection with Druid, this offer reduced bloat and potential performance
improvements for larger result sets. To use it apply the following connection url instead, everything else remains the same
```
String url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica-protobuf/;serialization=protobuf";
```

> The protobuf endpoint is also known to work with the official [Golang Avatica driver](https://github.com/apache/calcite-avatica-go)

Table metadata is available over JDBC using `connection.getMetaData()` or by querying the
["INFORMATION_SCHEMA" tables](#metadata-tables).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.auth.BasicCredentials;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.sql.avatica.DruidAvaticaHandler;
import org.apache.druid.sql.avatica.DruidAvaticaJsonHandler;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.utils.HttpUtil;
Expand Down Expand Up @@ -285,12 +285,12 @@ void verifySystemSchemaQueryFailure(

String getBrokerAvacticaUrl()
{
return "jdbc:avatica:remote:url=" + config.getBrokerUrl() + DruidAvaticaHandler.AVATICA_PATH;
return "jdbc:avatica:remote:url=" + config.getBrokerUrl() + DruidAvaticaJsonHandler.AVATICA_PATH;
}

String getRouterAvacticaUrl()
{
return "jdbc:avatica:remote:url=" + config.getRouterUrl() + DruidAvaticaHandler.AVATICA_PATH;
return "jdbc:avatica:remote:url=" + config.getRouterUrl() + DruidAvaticaJsonHandler.AVATICA_PATH;
}

void verifyAdminOptionsRequest()
Expand Down
9 changes: 4 additions & 5 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-guice</artifactId>
</dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
</dependency>

<!-- Tests -->
<dependency>
Expand Down Expand Up @@ -450,11 +454,6 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.apache.calcite.avatica.remote.ProtobufTranslation;
import org.apache.calcite.avatica.remote.ProtobufTranslationImpl;
import org.apache.calcite.avatica.remote.Service;
import org.apache.commons.io.IOUtils;
import org.apache.druid.client.selector.Server;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Smile;
Expand Down Expand Up @@ -118,6 +122,7 @@ private static void handleException(HttpServletResponse response, ObjectMapper o
private final RequestLogger requestLogger;
private final GenericQueryMetricsFactory queryMetricsFactory;
private final AuthenticatorMapper authenticatorMapper;
private final ProtobufTranslation protobufTranslation;

private HttpClient broadcastClient;

Expand Down Expand Up @@ -145,6 +150,7 @@ public AsyncQueryForwardingServlet(
this.requestLogger = requestLogger;
this.queryMetricsFactory = queryMetricsFactory;
this.authenticatorMapper = authenticatorMapper;
this.protobufTranslation = new ProtobufTranslationImpl();
}

@Override
Expand Down Expand Up @@ -191,9 +197,16 @@ protected void service(HttpServletRequest request, HttpServletResponse response)
// them as a generic request.
final boolean isQueryEndpoint = requestURI.startsWith("/druid/v2") && !requestURI.startsWith("/druid/v2/sql");

final boolean isAvatica = requestURI.startsWith("/druid/v2/sql/avatica");
final boolean isAvaticaJson = requestURI.startsWith("/druid/v2/sql/avatica");
final boolean isAvaticaPb = requestURI.startsWith("/druid/v2/sql/avatica-protobuf");

if (isAvatica) {
if (isAvaticaPb) {
byte[] requestBytes = IOUtils.toByteArray(request.getInputStream());
Service.Request protobufRequest = this.protobufTranslation.parseRequest(requestBytes);
String connectionId = getAvaticaProtobufConnectionId(protobufRequest);
targetServer = hostFinder.findServerAvatica(connectionId);
request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
} else if (isAvaticaJson) {
Map<String, Object> requestMap = objectMapper.readValue(
request.getInputStream(),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
Expand Down Expand Up @@ -456,6 +469,95 @@ static String getAvaticaConnectionId(Map<String, Object> requestMap)
return (String) connectionIdObj;
}

static String getAvaticaProtobufConnectionId(Service.Request request)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side note, since we now have avatica stuff as dependencies to this project, it probably makes sense in a follow-up to modify the JSON version to use the types like this is doing instead of deserializing to a map.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how much there is to gain from that, what are you thinking would be the advantage? Also, looking through it I can't find an easy to use class like ProtobufTranslationImpl for json.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I guess the reason would be defensive for the most part, to more strongly type the check so that whenever we upgrade the library, stuff would fail at compile time instead of just fail in strange ways at run time if any of the requests ever change, however unlikely.

Some of the unit tests for the json path are using the avatica types for json to test this area already, so was just thinking since its used as a runtime depend now we could use the expected types for non tests too. But yeah, its definitely not necessary and doesn't provide a lot of gain, nor do I think we should do it in this PR or anything.

{
if (request instanceof Service.CatalogsRequest) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These if clause seems fragile from changes on Avatica request types (such as new request type), but I'm not sure if there is a better way..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it's a bit fragile, however, that, I'm afraid, is the nature of protobufs. I'm not sure how likely they are to add new request types without significant overhaul to Calcite though

return ((Service.CatalogsRequest) request).connectionId;
}

if (request instanceof Service.SchemasRequest) {
return ((Service.SchemasRequest) request).connectionId;
}

if (request instanceof Service.TablesRequest) {
return ((Service.TablesRequest) request).connectionId;
}

if (request instanceof Service.TypeInfoRequest) {
return ((Service.TypeInfoRequest) request).connectionId;
}

if (request instanceof Service.ColumnsRequest) {
return ((Service.ColumnsRequest) request).connectionId;
}

if (request instanceof Service.ExecuteRequest) {
return ((Service.ExecuteRequest) request).statementHandle.connectionId;
}

if (request instanceof Service.TableTypesRequest) {
return ((Service.TableTypesRequest) request).connectionId;
}

if (request instanceof Service.PrepareRequest) {
return ((Service.PrepareRequest) request).connectionId;
}

if (request instanceof Service.PrepareAndExecuteRequest) {
return ((Service.PrepareAndExecuteRequest) request).connectionId;
}

if (request instanceof Service.FetchRequest) {
return ((Service.FetchRequest) request).connectionId;
}

if (request instanceof Service.CreateStatementRequest) {
return ((Service.CreateStatementRequest) request).connectionId;
}

if (request instanceof Service.CloseStatementRequest) {
return ((Service.CloseStatementRequest) request).connectionId;
}

if (request instanceof Service.OpenConnectionRequest) {
return ((Service.OpenConnectionRequest) request).connectionId;
}

if (request instanceof Service.CloseConnectionRequest) {
return ((Service.CloseConnectionRequest) request).connectionId;
}

if (request instanceof Service.ConnectionSyncRequest) {
return ((Service.ConnectionSyncRequest) request).connectionId;
}

if (request instanceof Service.DatabasePropertyRequest) {
return ((Service.DatabasePropertyRequest) request).connectionId;
}

if (request instanceof Service.SyncResultsRequest) {
return ((Service.SyncResultsRequest) request).connectionId;
}

if (request instanceof Service.CommitRequest) {
return ((Service.CommitRequest) request).connectionId;
}

if (request instanceof Service.RollbackRequest) {
return ((Service.RollbackRequest) request).connectionId;
}

if (request instanceof Service.PrepareAndExecuteBatchRequest) {
return ((Service.PrepareAndExecuteBatchRequest) request).connectionId;
}

if (request instanceof Service.ExecuteBatchRequest) {
return ((Service.ExecuteBatchRequest) request).connectionId;
}

throw new IAE("Received an unknown Avatica protobuf request");
}

private class MetricsEmittingProxyResponseListener<T> extends ProxyResponseListener
{
private final HttpServletRequest req;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,45 @@ public void testGetAvaticaConnectionId() throws JsonProcessingException
}
}

@Test
public void testGetAvaticaProtobufConnectionId()
{
final String query = "SELECT someColumn FROM druid.someTable WHERE someColumn IS NOT NULL";
final String connectionId = "000000-0000-0000-00000000";
final int statementId = 1337;
final int maxNumRows = 1000;

final List<? extends Service.Request> avaticaRequests = ImmutableList.of(
new Service.CatalogsRequest(connectionId),
new Service.SchemasRequest(connectionId, "druid", null),
new Service.TablesRequest(connectionId, "druid", "druid", null, null),
new Service.ColumnsRequest(connectionId, "druid", "druid", "someTable", null),
new Service.PrepareAndExecuteRequest(
connectionId,
statementId,
query,
maxNumRows
),
new Service.PrepareRequest(connectionId, query, maxNumRows),
new Service.ExecuteRequest(
new Meta.StatementHandle(connectionId, statementId, null),
ImmutableList.of(),
maxNumRows
),
new Service.CloseStatementRequest(connectionId, statementId),
new Service.CloseConnectionRequest(connectionId)
);


for (Service.Request request : avaticaRequests) {
Assert.assertEquals(
"failed",
connectionId,
AsyncQueryForwardingServlet.getAvaticaProtobufConnectionId(request)
);
}
}

private static Map<String, Object> asMap(String json, ObjectMapper mapper) throws JsonProcessingException
{
return mapper.readValue(json, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.apache.druid.server.security.AuthenticationUtils;
import org.apache.druid.server.security.Authenticator;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.sql.avatica.DruidAvaticaHandler;
import org.apache.druid.sql.avatica.DruidAvaticaJsonHandler;
import org.apache.druid.sql.avatica.DruidAvaticaProtobufHandler;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
Expand All @@ -57,7 +58,8 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
// JDBC authentication uses the JDBC connection context instead of HTTP headers, skip the normal auth checks.
// The router will keep the connection context in the forwarded message, and the broker is responsible for
// performing the auth checks.
DruidAvaticaHandler.AVATICA_PATH
DruidAvaticaJsonHandler.AVATICA_PATH,
DruidAvaticaProtobufHandler.AVATICA_PATH
);

private final DruidHttpClientConfig routerHttpClientConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.sql.avatica", AvaticaServerConfig.class);
binder.bind(AvaticaMonitor.class).in(LazySingleton.class);
JettyBindings.addHandler(binder, DruidAvaticaHandler.class);
JettyBindings.addHandler(binder, DruidAvaticaJsonHandler.class);
JettyBindings.addHandler(binder, DruidAvaticaProtobufHandler.class);
MetricsModule.register(binder, AvaticaMonitor.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

public class DruidAvaticaHandler extends AvaticaJsonHandler
public class DruidAvaticaJsonHandler extends AvaticaJsonHandler
{
public static final String AVATICA_PATH = "/druid/v2/sql/avatica/";

@Inject
public DruidAvaticaHandler(
public DruidAvaticaJsonHandler(
final DruidMeta druidMeta,
@Self final DruidNode druidNode,
final AvaticaMonitor avaticaMonitor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.avatica;

import com.google.inject.Inject;
import org.apache.calcite.avatica.remote.LocalService;
import org.apache.calcite.avatica.remote.Service;
import org.apache.calcite.avatica.server.AvaticaProtobufHandler;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.server.DruidNode;
import org.eclipse.jetty.server.Request;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

public class DruidAvaticaProtobufHandler extends AvaticaProtobufHandler
{
public static final String AVATICA_PATH = "/druid/v2/sql/avatica-protobuf/";

@Inject
public DruidAvaticaProtobufHandler(
final DruidMeta druidMeta,
@Self final DruidNode druidNode,
final AvaticaMonitor avaticaMonitor
)
{
super(new LocalService(druidMeta), avaticaMonitor);
setServerRpcMetadata(new Service.RpcMetadataResponse(druidNode.getHostAndPortToUse()));
}

@Override
public void handle(
final String target,
final Request baseRequest,
final HttpServletRequest request,
final HttpServletResponse response
) throws IOException, ServletException
{
if (request.getRequestURI().equals(AVATICA_PATH)) {
super.handle(target, baseRequest, request, response);
}
}
}
6 changes: 4 additions & 2 deletions sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ public void openConnection(final ConnectionHandle ch, final Map<String, String>
{
// Build connection context.
final ImmutableMap.Builder<String, Object> context = ImmutableMap.builder();
for (Map.Entry<String, String> entry : info.entrySet()) {
context.put(entry);
if (info != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if this was causing some problem before... looking at other implementations some do check for null on this field, 👍

for (Map.Entry<String, String> entry : info.entrySet()) {
context.put(entry);
}
}
openDruidConnection(ch.id, context.build());
}
Expand Down
Loading