Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.planner.PlannerContext;

import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -269,6 +270,20 @@ public static Map<String, Object> makeTaskContext(
.put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
.put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages());

// Put the lookup loading info in the task context to facilitate selective loading of lookups.
if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) != null) {
taskContextOverridesBuilder.put(
PlannerContext.CTX_LOOKUP_LOADING_MODE,
controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE)
);
}
if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) {
taskContextOverridesBuilder.put(
PlannerContext.CTX_LOOKUPS_TO_LOAD,
controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD)
);
}

if (querySpec.getDestination().toSelectDestination() != null) {
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.run.SqlResults;
Expand Down Expand Up @@ -333,4 +334,10 @@ public static boolean writeResultsToDurableStorage(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof DurableStorageMSQDestination;
}

@Override
public LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand All @@ -37,9 +38,13 @@
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerContext;
import org.apache.druid.msq.exec.WorkerImpl;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.planner.PlannerContext;

import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -185,4 +190,26 @@ public int hashCode()
{
return Objects.hash(super.hashCode(), controllerTaskId, workerNumber, retryCount, worker);
}

@Override
public LookupLoadingSpec getLookupLoadingSpec()
{
final Object lookupModeValue = getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE);
if (lookupModeValue == null) {
return LookupLoadingSpec.ALL;
}

final LookupLoadingSpec.Mode lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString());
if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) {
return LookupLoadingSpec.NONE;
} else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) {
Collection<String> lookupsToLoad = (Collection<String>) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD);
if (lookupsToLoad == null || lookupsToLoad.isEmpty()) {
throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad);
}
return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad));
} else {
return LookupLoadingSpec.ALL;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
Expand Down Expand Up @@ -283,6 +284,12 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)

MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);

final Map<String, Object> context = new HashMap<>();
context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode());
if (plannerContext.getLookupLoadingSpec().getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED) {
context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad());
}

final MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
querySpec.withOverriddenContext(nativeQueryContext),
Expand All @@ -291,7 +298,7 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
SqlResults.Context.fromPlannerContext(plannerContext),
sqlTypeNames,
columnTypeList,
null
context
);

FutureUtils.getUnchecked(overlordClient.runTask(taskId, controllerTask), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.JsonInputFormat;
Expand Down Expand Up @@ -70,6 +71,7 @@
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.filtration.Filtration;
Expand Down Expand Up @@ -227,7 +229,9 @@ public void testSelectOnFoo(String contextName, Map<String, Object> context)
new Object[]{1L, "1"},
new Object[]{1L, "def"},
new Object[]{1L, "abc"}
)).verifyResults();
))
.setExpectedLookupLoadingSpec(LookupLoadingSpec.NONE)
.verifyResults();
}

@MethodSource("data")
Expand Down Expand Up @@ -742,6 +746,7 @@ public void testSelectLookup(String contextName, Map<String, Object> context)
.build())
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{4L}))
.setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo")))
.verifyResults();
}

Expand Down Expand Up @@ -808,6 +813,7 @@ public void testJoinWithLookup(String contextName, Map<String, Object> context)
new Object[]{"xabc", 1L}
)
)
.setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo")))
.verifyResults();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.msq.indexing;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
Expand All @@ -34,12 +35,15 @@
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -84,6 +88,55 @@ public void testGetInputSourceResources()
Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty());
}

@Test
public void testGetDefaultLookupLoadingSpec()
{
MSQControllerTask controllerTask = new MSQControllerTask(
null,
MSQ_SPEC,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec());
}

@Test
public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext()
{
MSQSpec build = MSQSpec
.builder()
.query(new Druids.ScanQueryBuilder()
.intervals(new MultipleIntervalSegmentSpec(INTERVALS))
.dataSource("target")
.context(
ImmutableMap.of(
PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"),
PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED)
)
.build()
)
.columnMappings(new ColumnMappings(Collections.emptyList()))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build();
MSQControllerTask controllerTask = new MSQControllerTask(
null,
build,
null,
null,
null,
null,
null,
null
);

// Va;idate that MSQ Controller task doesn't load any lookups even if context has lookup info populated.
Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec());
}

@Test
public void testGetTaskAllocatorId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@
package org.apache.druid.msq.indexing;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;

Expand All @@ -47,7 +54,6 @@ public class MSQWorkerTaskTest
@Test
public void testEquals()
{
Assert.assertEquals(msqWorkerTask, msqWorkerTask);
Assert.assertEquals(
msqWorkerTask,
new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount)
Expand Down Expand Up @@ -108,4 +114,61 @@ public void testGetInputSourceResources()
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}

@Test
public void testGetDefaultLookupLoadingSpec()
{
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(LookupLoadingSpec.ALL, msqWorkerTask.getLookupLoadingSpec());
}

@Test
public void testGetLookupLoadingWithModeNoneInContext()
{
final ImmutableMap<String, Object> context = ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE);
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(LookupLoadingSpec.NONE, msqWorkerTask.getLookupLoadingSpec());
}

@Test
public void testGetLookupLoadingSpecWithLookupListInContext()
{
final ImmutableMap<String, Object> context = ImmutableMap.of(
PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"),
PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED);
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, msqWorkerTask.getLookupLoadingSpec().getMode());
Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad());
}

@Test
public void testGetLookupLoadingSpecWithInvalidInput()
{
final HashMap<String, Object> context = new HashMap<>();
context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED);

// Setting CTX_LOOKUPS_TO_LOAD as null
context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null);

MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
DruidException exception = Assert.assertThrows(
DruidException.class,
taskWithNullLookups::getLookupLoadingSpec
);
Assert.assertEquals(
"Set of lookups to load cannot be null for mode[ONLY_REQUIRED].",
exception.getMessage());

// Setting CTX_LOOKUPS_TO_LOAD as empty list
context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList());

MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
exception = Assert.assertThrows(
DruidException.class,
taskWithEmptyLookups::getLookupLoadingSpec
);
Assert.assertEquals(
"Set of lookups to load cannot be [] for mode[ONLY_REQUIRED].",
exception.getMessage());
}
}
Loading