Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters
|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:<br />- Log the stack trace of the exception (if any) produced by the query |
|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly, and Dart will not respect this context parameter.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly.|

## Parameters by query type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.ControllerMemoryParameters;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.indexing.IndexerControllerContext;
Expand Down Expand Up @@ -77,12 +78,15 @@ public class DartControllerContext implements ControllerContext
*/
public static final int DEFAULT_MAX_NON_LEAF_WORKER_COUNT = 1;

public static final SegmentSource DEFAULT_SEGMENT_SOURCE = SegmentSource.REALTIME;

private final Injector injector;
private final ObjectMapper jsonMapper;
private final DruidNode selfNode;
private final DartWorkerClient workerClient;
private final TimelineServerView serverView;
private final MemoryIntrospector memoryIntrospector;
private final QueryContext context;
private final ServiceMetricEvent.Builder metricBuilder;
private final ServiceEmitter emitter;

Expand All @@ -93,7 +97,8 @@ public DartControllerContext(
final DartWorkerClient workerClient,
final MemoryIntrospector memoryIntrospector,
final TimelineServerView serverView,
final ServiceEmitter emitter
final ServiceEmitter emitter,
final QueryContext context
)
{
this.injector = injector;
Expand All @@ -102,6 +107,7 @@ public DartControllerContext(
this.workerClient = workerClient;
this.serverView = serverView;
this.memoryIntrospector = memoryIntrospector;
this.context = context;
this.metricBuilder = new ServiceMetricEvent.Builder();
this.emitter = emitter;
}
Expand Down Expand Up @@ -180,7 +186,7 @@ public DruidNode selfNode()
@Override
public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager)
{
return DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), serverView);
return DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), serverView, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@

import org.apache.druid.msq.dart.controller.sql.DartQueryMaker;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.query.QueryContext;

/**
* Class for creating {@link ControllerContext} in {@link DartQueryMaker}.
*/
public interface DartControllerContextFactory
{
ControllerContext newContext(String queryId);
ControllerContext newContext(QueryContext queryContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.druid.msq.dart.worker.DartWorkerClientImpl;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;

Expand Down Expand Up @@ -68,16 +70,18 @@ public DartControllerContextFactoryImpl(
}

@Override
public ControllerContext newContext(final String queryId)
public ControllerContext newContext(final QueryContext context)
{
final String queryId = context.getString(QueryContexts.CTX_DART_QUERY_ID);
return new DartControllerContext(
injector,
jsonMapper,
selfNode,
new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()),
memoryIntrospector,
serverView,
emitter
emitter,
context
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.dart.controller;

import org.apache.druid.msq.dart.worker.DartQueryableSegment;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;

import java.util.ArrayList;
import java.util.List;

/**
* Represents the set of segments assigned to a particular dart worker, used by {@link DartTableInputSpecSlicer}.
*/
public class DartSegmentAssignment
{
private final List<DartQueryableSegment> dartQueryableSegments;
private final List<DataServerRequestDescriptor> dataServerRequestDescriptor;

public DartSegmentAssignment(
List<DartQueryableSegment> dartQueryableSegments,
List<DataServerRequestDescriptor> dataServerRequestDescriptor
)
{
this.dartQueryableSegments = dartQueryableSegments;
this.dataServerRequestDescriptor = dataServerRequestDescriptor;
}

public static DartSegmentAssignment empty()
{
return new DartSegmentAssignment(new ArrayList<>(), new ArrayList<>());
}

public void addSegments(DartQueryableSegment segment)
{
dartQueryableSegments.add(segment);
}

public void addRequest(DataServerRequestDescriptor requestDescriptor)
{
dataServerRequestDescriptor.add(requestDescriptor);
}

public List<DartQueryableSegment> getDartQueryableSegments()
{
return dartQueryableSegments;
}

public List<DataServerRequestDescriptor> getDataServerRequestDescriptor()
{
return dataServerRequestDescriptor;
}

public boolean isEmpty()
{
return dataServerRequestDescriptor.isEmpty() && dartQueryableSegments.isEmpty();
}
}
Loading
Loading