Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,24 @@ public static <T extends Enum<T>> T getEnumIfPresent(final Class<T> enumClass, f
}

/**
* If first argument is not null, return it, else return the other argument. Sort of like
* {@link com.google.common.base.Objects#firstNonNull(T, T)} except will not explode if both arguments are
* null.
* If first argument is not null, return it, else return the other argument.
* Sort of like
* {@link static <T> com.google.common.base.Objects#firstNonNull(T, T)} except
* will not explode if both arguments are null.
*/
@Nullable
public static <T> T firstNonNull(@Nullable T arg1, @Nullable T arg2)
{
if (arg1 == null) {
return arg2;
}
return arg1;
return arg1 == null ? arg2 : arg1;
}

/**
* Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture}
* Cancel futures manually, because sometime we can't cancel all futures in
* {@code com.google.common.util.concurrent.Futures.CombinedFuture}
* automatically. Especially when we call
* {@link static <V> ListenableFuture<List<V>> com.google.common.util.concurrent.Futures#allAsList(Iterable<? extends ListenableFuture <? extends V>> futures)} to create a batch of
* future.
* {@link static <V> ListenableFuture<List<V>>
* com.google.common.util.concurrent.Futures#allAsList(Iterable<? extends ListenableFuture <? extends V>> futures)}
* to create a batch of futures.
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testJdbcPrepareStatementQuery()
}
}

@Test(expectedExceptions = AvaticaSqlException.class, expectedExceptionsMessageRegExp = ".* Parameter at position\\[0] is not bound")
@Test(expectedExceptions = AvaticaSqlException.class, expectedExceptionsMessageRegExp = ".* Parameter at position \\[0] is not bound")
public void testJdbcPrepareStatementQueryMissingParameters() throws SQLException
{
for (String url : connections) {
Expand Down
117 changes: 67 additions & 50 deletions sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.planner.PrepareResult;
import org.apache.druid.sql.calcite.planner.ValidationResult;
import org.apache.druid.sql.http.SqlParameter;
import org.apache.druid.sql.http.SqlQuery;

Expand All @@ -69,6 +68,7 @@
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -106,13 +106,18 @@ public class SqlLifecycle
@GuardedBy("stateLock")
private State state = State.NEW;

// init during intialize
// init during initialize
private String sql;
private QueryContext queryContext;
private List<TypedValue> parameters;

// init during plan
/**
* The Druid planner follows the SQL statement through the lifecycle.
* The planner's state is start --> validate --> (prepare | plan).
*/
private DruidPlanner planner;
private PlannerContext plannerContext;
private ValidationResult validationResult;
private PrepareResult prepareResult;
private PlannerResult plannerResult;

Expand Down Expand Up @@ -170,7 +175,7 @@ private String sqlQueryId()
}

/**
* Assign dynamic parameters to be used to substitute values during query exection. This can be performed at any
* Assign dynamic parameters to be used to substitute values during query execution. This can be performed at any
* part of the lifecycle.
*/
public void setParameters(List<TypedValue> parameters)
Expand All @@ -196,14 +201,13 @@ public void validateAndAuthorize(AuthenticationResult authenticationResult)
}
transition(State.INITIALIZED, State.AUTHORIZING);
validate(authenticationResult);
Access access = doAuthorize(
doAuthorize(resourceActions ->
AuthorizationUtils.authorizeAllResourceActions(
authenticationResult,
validationResult.getResourceActions(),
resourceActions,
plannerFactory.getAuthorizerMapper()
)
);
checkAccess(access);
}

/**
Expand All @@ -218,26 +222,29 @@ public void validateAndAuthorize(HttpServletRequest req)
transition(State.INITIALIZED, State.AUTHORIZING);
AuthenticationResult authResult = AuthorizationUtils.authenticationResultFromRequest(req);
validate(authResult);
Access access = doAuthorize(
doAuthorize(resourceActions ->
AuthorizationUtils.authorizeAllResourceActions(
req,
validationResult.getResourceActions(),
resourceActions,
plannerFactory.getAuthorizerMapper()
)
);
checkAccess(access);
}

private ValidationResult validate(AuthenticationResult authenticationResult)
/**
* Perform the validation step on the Druid planner, leaving the planner
* ready to perform either prepare or plan.
*/
private void validate(AuthenticationResult authenticationResult)
{
try (DruidPlanner planner = plannerFactory.createPlanner(sql, queryContext)) {
try {
planner = plannerFactory.createPlanner(sql, queryContext);
// set planner context for logs/metrics in case something explodes early
this.plannerContext = planner.getPlannerContext();
this.plannerContext.setAuthenticationResult(authenticationResult);
plannerContext = planner.getPlannerContext();
plannerContext.setAuthenticationResult(authenticationResult);
// set parameters on planner context, if parameters have already been set
this.plannerContext.setParameters(parameters);
this.validationResult = planner.validate(authConfig.authorizeQueryContextParams());
return validationResult;
plannerContext.setParameters(parameters);
planner.validate();
}
// we can't collapse catch clauses since SqlPlanningException has type-sensitive constructors.
catch (SqlParseException e) {
Expand All @@ -248,46 +255,43 @@ private ValidationResult validate(AuthenticationResult authenticationResult)
}
}

private Access doAuthorize(final Access authorizationResult)
private void doAuthorize(Function<Set<ResourceAction>, Access> authorizer)
{
Access authorizationResult = planner.authorize(
authorizer,
authConfig.authorizeQueryContextParams());
if (!authorizationResult.isAllowed()) {
// Not authorized; go straight to Jail, do not pass Go.
transition(State.AUTHORIZING, State.UNAUTHORIZED);
} else {
transition(State.AUTHORIZING, State.AUTHORIZED);
}
return authorizationResult;
}

private void checkAccess(Access access)
{
plannerContext.setAuthorizationResult(access);
if (!access.isAllowed()) {
throw new ForbiddenException(access.toString());
if (!authorizationResult.isAllowed()) {
throw new ForbiddenException(authorizationResult.toString());
}
}

/**
* Prepare the query lifecycle for execution, without completely planning into something that is executable, but
* including some initial parsing and validation and any dyanmic parameter type resolution, to support prepared
* Prepare the query lifecycle for execution, without completely planning into
* something that is executable, but including some initial parsing and
* validation and any dynamic parameter type resolution, to support prepared
* statements via JDBC.
*
* The planner must have already performed the validation step: the planner
* state is reused here.
*/
public PrepareResult prepare() throws RelConversionException
public PrepareResult prepare()
{
synchronized (stateLock) {
if (state != State.AUTHORIZED) {
throw new ISE("Cannot prepare because current state[%s] is not [%s].", state, State.AUTHORIZED);
throw new ISE("Cannot prepare because current state [%s] is not [%s].", state, State.AUTHORIZED);
}
}
Preconditions.checkNotNull(plannerContext, "Cannot prepare, plannerContext is null");
try (DruidPlanner planner = plannerFactory.createPlannerWithContext(plannerContext)) {
try {
this.prepareResult = planner.prepare();
return prepareResult;
}
// we can't collapse catch clauses since SqlPlanningException has type-sensitive constructors.
catch (SqlParseException e) {
throw new SqlPlanningException(e);
}
catch (ValidationException e) {
throw new SqlPlanningException(e);
}
Expand All @@ -296,22 +300,27 @@ public PrepareResult prepare() throws RelConversionException
/**
* Plan the query to enable execution.
*
* If successful, the lifecycle will first transition from {@link State#AUTHORIZED} to {@link State#PLANNED}.
* The planner must have already performed the validation step: the planner
* state is reused here.
*
* If successful, the lifecycle will first transition from
* {@link State#AUTHORIZED} to {@link State#PLANNED}.
*/
public void plan() throws RelConversionException
{
transition(State.AUTHORIZED, State.PLANNED);
Preconditions.checkNotNull(plannerContext, "Cannot plan, plannerContext is null");
try (DruidPlanner planner = plannerFactory.createPlannerWithContext(plannerContext)) {
try {
this.plannerResult = planner.plan();
}
// we can't collapse catch clauses since SqlPlanningException has type-sensitive constructors.
catch (SqlParseException e) {
throw new SqlPlanningException(e);
}
catch (ValidationException e) {
throw new SqlPlanningException(e);
}
finally {
// Done with the planner, close it.
planner.close();
planner = null;
}
}

/**
Expand Down Expand Up @@ -376,20 +385,22 @@ public void after(boolean isDone, Throwable thrown)
});
}


@VisibleForTesting
public ValidationResult runAnalyzeResources(AuthenticationResult authenticationResult)
public Set<ResourceAction> runAnalyzeResources(AuthenticationResult authenticationResult)
{
return validate(authenticationResult);
validate(authenticationResult);
return getRequiredResourceActions();
}

public Set<ResourceAction> getRequiredResourceActions()
{
return Preconditions.checkNotNull(validationResult, "validationResult").getResourceActions();
return planner == null
? null
: planner.resourceActions(authConfig.authorizeQueryContextParams());
}

/**
* Cancel all native queries associated to this lifecycle.
* Cancel all native queries associated with this lifecycle.
*
* This method is thread-safe.
*/
Expand All @@ -405,7 +416,7 @@ public void cancel()
final CopyOnWriteArrayList<String> nativeQueryIds = plannerContext.getNativeQueryIds();

for (String nativeQueryId : nativeQueryIds) {
log.debug("canceling native query [%s]", nativeQueryId);
log.debug("Canceling native query [%s]", nativeQueryId);
queryScheduler.cancelQuery(nativeQueryId);
}
}
Expand Down Expand Up @@ -440,6 +451,11 @@ public void finalizeStateAndEmitLogsAndMetrics(
}
}

if (planner != null) {
planner.close();
planner = null;
}

final boolean success = e == null;
final long queryTimeNs = System.nanoTime() - startNs;

Expand All @@ -449,10 +465,11 @@ public void finalizeStateAndEmitLogsAndMetrics(
metricBuilder.setDimension("id", plannerContext.getSqlQueryId());
metricBuilder.setDimension("nativeQueryIds", plannerContext.getNativeQueryIds().toString());
}
if (validationResult != null) {
Set<ResourceAction> actions = getRequiredResourceActions();
if (actions != null) {
metricBuilder.setDimension(
"dataSource",
validationResult.getResourceActions()
actions
.stream()
.map(action -> action.getResource().getName())
.collect(Collectors.toList())
Expand Down Expand Up @@ -527,7 +544,7 @@ private void transition(final State from, final State to)
}
if (state != from) {
throw new ISE(
"Cannot transition from[%s] to[%s] because current state[%s] is not [%s].",
"Cannot transition from [%s] to [%s] because current state [%s] is not [%s].",
from,
to,
state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory)
}

if (statements.size() >= maxStatements) {
throw DruidMeta.logFailure(new ISE("Too many open statements, limit is[%,d]", maxStatements));
throw DruidMeta.logFailure(new ISE("Too many open statements, limit is [%,d]", maxStatements));
}

@SuppressWarnings("GuardedBy")
Expand All @@ -100,14 +100,14 @@ public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory)
sqlLifecycleFactory.factorize(),
() -> {
// onClose function for the statement
LOG.debug("Connection[%s] closed statement[%s].", connectionId, statementId);
LOG.debug("Connection [%s] closed statement [%s].", connectionId, statementId);
// statements will be accessed unsynchronized to avoid deadlock
statements.remove(statementId);
}
);

statements.put(statementId, statement);
LOG.debug("Connection[%s] opened statement[%s].", connectionId, statementId);
LOG.debug("Connection [%s] opened statement [%s].", connectionId, statementId);
return statement;
}
}
Expand Down Expand Up @@ -145,11 +145,11 @@ public void close()
statement.close();
}
catch (Exception e) {
LOG.warn("Connection[%s] failed to close statement[%s]!", connectionId, statement.getStatementId());
LOG.warn("Connection [%s] failed to close statement [%s]!", connectionId, statement.getStatementId());
}
}

LOG.debug("Connection[%s] closed.", connectionId);
LOG.debug("Connection [%s] closed.", connectionId);
open = false;
}
}
Expand Down
Loading