Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ private QueryKit makeQueryControllerToolKit()
final Map<Class<? extends Query>, QueryKit> kitMap =
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
.put(ScanQuery.class, new ScanQueryKit(context.jsonMapper()))
.put(GroupByQuery.class, new GroupByQueryKit())
.put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper()))
.build();

return new MultiQueryKit(kitMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand All @@ -67,6 +68,16 @@
*/
public class DataSourcePlan
{
/**
* A map with {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} set to null, so we can clear it from the context
* of subqueries.
*/
private static final Map<String, Object> CONTEXT_MAP_NO_SEGMENT_GRANULARITY = new HashMap<>();

static {
CONTEXT_MAP_NO_SEGMENT_GRANULARITY.put(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, null);
}

private final DataSource newDataSource;
private final List<InputSpec> inputSpecs;
private final IntSet broadcastInputs;
Expand Down Expand Up @@ -247,7 +258,10 @@ private static DataSourcePlan forQuery(
{
final QueryDefinition subQueryDef = queryKit.makeQueryDefinition(
queryId,
dataSource.getQuery(),

// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
queryKit,
ShuffleSpecFactories.subQueryWithMaxWorkerCount(maxWorkerCount),
maxWorkerCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.SortColumn;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -76,15 +75,16 @@ public class QueryKitUtils
*/
public static final String CTX_TIME_COLUMN_NAME = "__timeColumn";

private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();

public static Granularity getSegmentGranularityFromContext(@Nullable final Map<String, Object> context)
public static Granularity getSegmentGranularityFromContext(
final ObjectMapper objectMapper,
@Nullable final Map<String, Object> context
)
{
final Object o = context == null ? null : context.get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY);

if (o instanceof String) {
try {
return OBJECT_MAPPER.readValue((String) o, Granularity.class);
return objectMapper.readValue((String) o, Granularity.class);
}
catch (JsonProcessingException e) {
throw new ISE("Invalid segment granularity [%s]", o);
Expand Down Expand Up @@ -188,9 +188,10 @@ public static RowSignature sortableSignature(
* @throws IllegalArgumentException if the provided granularity is not supported
*/
@Nullable
public static VirtualColumn makeSegmentGranularityVirtualColumn(final Query<?> query)
public static VirtualColumn makeSegmentGranularityVirtualColumn(final ObjectMapper jsonMapper, final Query<?> query)
{
final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(query.getContext());
final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, query.getContext());
final String timeColumnName = query.context().getString(QueryKitUtils.CTX_TIME_COLUMN_NAME);

if (timeColumnName == null || Granularities.ALL.equals(segmentGranularity)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.msq.querykit.groupby;

import com.fasterxml.jackson.databind.ObjectMapper;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
Expand Down Expand Up @@ -92,7 +93,8 @@ public GroupByPostShuffleFrameProcessor(
final FrameReader frameReader,
final RowSignature resultSignature,
final ClusterBy clusterBy,
final MemoryAllocator allocator
final MemoryAllocator allocator,
final ObjectMapper jsonMapper
)
{
this.query = query;
Expand All @@ -107,7 +109,7 @@ public GroupByPostShuffleFrameProcessor(
this.finalizeFn = makeFinalizeFn(query);
this.havingSpec = cloneHavingSpec(query);
this.columnSelectorFactoryForFrameWriter =
makeVirtualColumnsForFrameWriter(query).wrap(
makeVirtualColumnsForFrameWriter(jsonMapper, query).wrap(
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
query,
() -> outputRow,
Expand Down Expand Up @@ -311,9 +313,13 @@ private static HavingSpec cloneHavingSpec(final GroupByQuery query)
* Create virtual columns containing "bonus" fields that should be attached to the {@link FrameWriter} for
* this processor. Kept in sync with the signature generated by {@link GroupByQueryKit}.
*/
private static VirtualColumns makeVirtualColumnsForFrameWriter(final GroupByQuery query)
private static VirtualColumns makeVirtualColumnsForFrameWriter(
final ObjectMapper jsonMapper,
final GroupByQuery query
)
{
final VirtualColumn segmentGranularityVirtualColumn = QueryKitUtils.makeSegmentGranularityVirtualColumn(query);
final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);

if (segmentGranularityVirtualColumn == null) {
return VirtualColumns.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public ProcessorsAndChannels<FrameProcessor<Long>, Long> makeProcessors(
readableInput.getChannelFrameReader(),
stageDefinition.getSignature(),
stageDefinition.getClusterBy(),
outputChannel.getFrameMemoryAllocator()
outputChannel.getFrameMemoryAllocator(),
frameContext.jsonMapper()
);
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.msq.querykit.groupby;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.SortColumn;
Expand Down Expand Up @@ -56,6 +57,13 @@

public class GroupByQueryKit implements QueryKit<GroupByQuery>
{
private final ObjectMapper jsonMapper;

public GroupByQueryKit(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}

@Override
public QueryDefinition makeQueryDefinition(
final String queryId,
Expand Down Expand Up @@ -85,7 +93,8 @@ public QueryDefinition makeQueryDefinition(
final GroupByQuery queryToRun = (GroupByQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());

final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext());
final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext());
final RowSignature intermediateSignature = computeIntermediateSignature(queryToRun);
final ClusterBy resultClusterBy =
QueryKitUtils.clusterByWithSegmentGranularity(computeClusterByForResults(queryToRun), segmentGranularity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.msq.querykit.scan;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
Expand Down Expand Up @@ -92,7 +93,8 @@ public ScanQueryFrameProcessor(
final ResourceHolder<WritableFrameChannel> outputChannel,
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
@Nullable final AtomicLong runningCountForLimit,
final long memoryReservedForBroadcastJoin
final long memoryReservedForBroadcastJoin,
final ObjectMapper jsonMapper
)
{
super(
Expand All @@ -111,7 +113,8 @@ public ScanQueryFrameProcessor(
final List<VirtualColumn> frameWriterVirtualColumns = new ArrayList<>();
frameWriterVirtualColumns.add(partitionBoostVirtualColumn);

final VirtualColumn segmentGranularityVirtualColumn = QueryKitUtils.makeSegmentGranularityVirtualColumn(query);
final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);

if (segmentGranularityVirtualColumn != null) {
frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ protected FrameProcessor<Long> makeProcessor(
allocatorHolder
)),
runningCountForLimit,
frameContext.memoryParameters().getBroadcastJoinMemory()
frameContext.memoryParameters().getBroadcastJoinMemory(),
frameContext.jsonMapper()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public QueryDefinition makeQueryDefinition(
signatureToUse = scanSignature;
} else {
final RowSignature.Builder signatureBuilder = RowSignature.builder().addAll(scanSignature);
final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext());
final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext());
final List<SortColumn> clusterByColumns = new ArrayList<>();

// Add regular orderBys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ private static void validateInsert(

try {
segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(
plannerContext.getJsonMapper(),
plannerContext.queryContextMap()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
Expand Down Expand Up @@ -152,7 +153,8 @@ public void close()
},
new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})),
null,
0L
0L,
new DefaultObjectMapper()
);

ListenableFuture<Long> retVal = exec.runFully(processor, null);
Expand Down