Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.FrameProcessorFactory;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.StageDefinition;
Expand Down Expand Up @@ -1193,8 +1194,35 @@ private Int2ObjectMap<Object> makeWorkerFactoryInfosForStage(
{
if (MSQControllerTask.isIngestion(querySpec) &&
stageNumber == queryDef.getFinalStageDefinition().getStageNumber()) {
// noinspection unchecked,rawtypes
return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate);
final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
if (!destination.doesSegmentMorphing()) {
// noinspection unchecked,rawtypes
return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate);
} else {
// worker info is the new lock version
if (destination.getReplaceTimeChunks().size() != 1) {
throw new ISE(
"Must have single interval in replaceTimeChunks, but got[%s]",
destination.getReplaceTimeChunks()
);
}
try {
final List<TaskLock> locks;
locks = context.taskActionClient().submit(new LockListAction());
if (locks.size() == 1) {
final Int2ObjectMap<Object> retVal = new Int2ObjectAVLTreeMap<>();
for (int worker : workerInputs.workers()) {
retVal.put(worker, locks.get(0).getVersion());
}
return retVal;
} else {
throw new ISE("Got number of locks other than one: [%s]", locks);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
} else {
return null;
}
Expand Down Expand Up @@ -1810,6 +1838,35 @@ private static QueryDefinition makeQueryDefinition(
}
}

// Possibly add a segment morpher stage.
if (((DataSourceMSQDestination) querySpec.getDestination()).doesSegmentMorphing()) {
final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
final FrameProcessorFactory segmentMorphFactory = destination.getSegmentMorphFactory();

if (!destination.isReplaceTimeChunks()) {
throw new MSQException(UnknownFault.forMessage("segmentMorphFactory requires replaceTimeChunks"));
}

builder.add(
StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(
new TableInputSpec(
destination.getDataSource(),
destination.getReplaceTimeChunks(),
null,
null
),
new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())
)
.broadcastInputs(IntSet.of(1))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.processorFactory(segmentMorphFactory)
);

// If there was a segment morpher, return immediately; don't add a segment-generation stage.
return builder.build();
}

// Then, add a segment-generation stage.
final DataSchema dataSchema =
makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper);
Expand Down Expand Up @@ -2723,7 +2780,8 @@ private void startStages() throws IOException, InterruptedException
for (final StageId stageId : newStageIds) {
// Allocate segments, if this is the final stage of an ingestion.
if (MSQControllerTask.isIngestion(querySpec)
&& stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber()) {
&& stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber()
&& !((DataSourceMSQDestination) querySpec.getDestination()).doesSegmentMorphing()) {
populateSegmentsToGenerate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
package org.apache.druid.msq.indexing.destination;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.kernel.FrameProcessorFactory;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.server.security.Resource;
Expand All @@ -49,18 +51,24 @@ public class DataSourceMSQDestination implements MSQDestination
@Nullable
private final List<Interval> replaceTimeChunks;

@Nullable
@SuppressWarnings("rawtypes")
private final FrameProcessorFactory segmentMorphFactory;

@JsonCreator
public DataSourceMSQDestination(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("segmentSortOrder") @Nullable List<String> segmentSortOrder,
@JsonProperty("replaceTimeChunks") @Nullable List<Interval> replaceTimeChunks
@JsonProperty("replaceTimeChunks") @Nullable List<Interval> replaceTimeChunks,
@JsonProperty("segmentMorphFactory") @Nullable FrameProcessorFactory segmentMorphFactory
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity, "segmentGranularity");
this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList();
this.replaceTimeChunks = replaceTimeChunks;
this.segmentMorphFactory = segmentMorphFactory;

if (replaceTimeChunks != null) {
// Verify that if replaceTimeChunks is provided, it is nonempty.
Expand Down Expand Up @@ -98,6 +106,30 @@ public String getDataSource()
return dataSource;
}

/**
* Returns the segment morph factory, if one is present, else null.
* <p>
* The segment morph factory if present, is a way to tell the MSQ task to funnel the results at the final stage to
* the {@link FrameProcessorFactory} instead of a segment generation stage.
*/
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public FrameProcessorFactory getSegmentMorphFactory()
{
return segmentMorphFactory;
}

/**
* Checks if the destination uses a segmentMorphFactory. If one is present, that means that the query would modify
* existing segments instead of generating new ones.
*/
@JsonIgnore
public boolean doesSegmentMorphing()
{
return segmentMorphFactory != null;
}

@JsonProperty
public Granularity getSegmentGranularity()
{
Expand Down Expand Up @@ -158,13 +190,14 @@ public boolean equals(Object o)
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(segmentGranularity, that.segmentGranularity)
&& Objects.equals(segmentSortOrder, that.segmentSortOrder)
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks);
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks)
&& Objects.equals(segmentMorphFactory, that.segmentMorphFactory);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks);
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, segmentMorphFactory);
}

@Override
Expand All @@ -175,6 +208,7 @@ public String toString()
", segmentGranularity=" + segmentGranularity +
", segmentSortOrder=" + segmentSortOrder +
", replaceTimeChunks=" + replaceTimeChunks +
(segmentMorphFactory != null ? ", segmentMorphFactory=" + segmentMorphFactory : "") +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
targetDataSource.getDestinationName(),
segmentGranularityObject,
segmentSortOrder,
replaceTimeChunks
replaceTimeChunks,
null
);
MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext,
dataSourceMSQDestination.isReplaceTimeChunks());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void testIngestWithSanitizedNullByte() throws IOException
new ColumnMapping("v1", "agent_category")
)
))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(DEFAULT_MSQ_CONTEXT)
Expand Down Expand Up @@ -318,7 +318,7 @@ public void testIngestWithSanitizedNullByteUsingContextParameter() throws IOExce
new ColumnMapping("agent_category", "agent_category")
)
))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(runtimeContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class MSQControllerTaskTest
"target",
Granularities.DAY,
null,
INTERVALS
INTERVALS,
null
))
.query(new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public class SqlStatementResourceTest extends MSQTestBase
"test",
Granularities.DAY,
null,
null,
null
))
.tuningConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ public void testEmptyCountersForDataSourceDestination()
"test",
Granularities.DAY,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2033,14 +2033,6 @@ private static class TestIndexIO extends IndexIO
}
}

final Metadata metadata = new Metadata(
null,
aggregatorFactories.toArray(new AggregatorFactory[0]),
null,
null,
null
);

queryableIndexMap.put(
entry.getValue(),
new SimpleQueryableIndex(
Expand All @@ -2049,7 +2041,13 @@ private static class TestIndexIO extends IndexIO
null,
columnMap,
null,
metadata,
() -> new Metadata(
null,
aggregatorFactories.toArray(new AggregatorFactory[0]),
null,
null,
null
),
false
)
);
Expand All @@ -2074,7 +2072,7 @@ void removeMetadata(File file)
index.getBitmapFactoryForDimensions(),
index.getColumns(),
index.getFileMapper(),
null,
() -> null,
false
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
* under the License.
*/

package org.apache.druid.query.rowsandcols;
package org.apache.druid.common.semantic;

import org.apache.druid.query.rowsandcols.RowsAndColumns;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand All @@ -26,8 +28,8 @@

/**
* Annotation used to indicate that the method is used as a creator for a semantic interface.
*
* Used in conjuction with {@link RowsAndColumns#makeAsMap(Class)} to build maps for simplified implementation of
* <p>
* Used in conjuction with {@link SemanticUtils#makeAsMap(Class)} to build maps for simplified implementation of
* the {@link RowsAndColumns#as(Class)} method.
*/
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.common.semantic;

import org.apache.druid.error.DruidException;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;

public class SemanticUtils
{
private static final Map<Class<?>, Map<Class<?>, Function<?, ?>>> OVERRIDES = new LinkedHashMap<>();

@SuppressWarnings("unused")
public static <C, T> void registerAsOverride(Class<C> clazz, Class<T> asInterface, Function<C, T> fn)
{
final Map<Class<?>, Function<?, ?>> classOverrides = OVERRIDES.computeIfAbsent(
clazz,
theClazz -> new LinkedHashMap<>()
);

final Function<?, ?> oldVal = classOverrides.get(asInterface);
if (oldVal != null) {
throw DruidException.defensive(
"Attempt to side-override the same interface [%s] multiple times for the same class [%s].",
asInterface,
clazz
);
} else {
classOverrides.put(asInterface, fn);
}
}

public static <T> Map<Class<?>, Function<T, ?>> makeAsMap(Class<T> clazz)
{
final Map<Class<?>, Function<T, ?>> retVal = new HashMap<>();

for (Method method : clazz.getMethods()) {
if (method.isAnnotationPresent(SemanticCreator.class)) {
if (method.getParameterCount() != 0) {
throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method);
}

retVal.put(method.getReturnType(), arg -> {
try {
return method.invoke(arg);
}
catch (InvocationTargetException | IllegalAccessException e) {
throw DruidException.defensive().build(e, "Problem invoking method [%s]", method);
}
});
}
}

final Map<Class<?>, Function<?, ?>> classOverrides = OVERRIDES.get(clazz);
if (classOverrides != null) {
for (Map.Entry<Class<?>, Function<?, ?>> overrideEntry : classOverrides.entrySet()) {
//noinspection unchecked
retVal.put(overrideEntry.getKey(), (Function<T, ?>) overrideEntry.getValue());
}
}

return retVal;
}
}
Loading