From b8ce9e51c31b1779120b86646bb109dacf740cf6 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 30 Apr 2024 12:34:10 +0530 Subject: [PATCH 01/14] Load only the required lookups for MSQControllerTask and MSQWorkerTask --- .../apache/druid/msq/exec/ControllerImpl.java | 1 + .../druid/msq/indexing/MSQControllerTask.java | 14 ++++++ .../druid/msq/indexing/MSQWorkerTask.java | 15 ++++++ .../msq/indexing/MSQControllerTaskTest.java | 49 +++++++++++++++++++ .../druid/msq/indexing/MSQWorkerTaskTest.java | 20 ++++++++ .../apache/druid/msq/test/MSQTestBase.java | 25 +++++++++- .../QueryLookupOperatorConversion.java | 10 +++- .../sql/calcite/planner/PlannerContext.java | 1 + .../planner/SqlResourceCollectorShuttle.java | 11 ++++- 9 files changed, 143 insertions(+), 3 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index db7c6838ba7e..73c65c4ae9f3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -195,6 +195,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.DruidNode; import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.storage.ExportStorageProvider; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 2a4352151441..3465fee9d402 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -58,13 +58,16 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.run.SqlResults; import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -333,4 +336,15 @@ public static boolean writeResultsToDurableStorage(final MSQSpec querySpec) { return querySpec.getDestination() instanceof DurableStorageMSQDestination; } + + @Override + public LookupLoadingSpec getLookupLoadingSpec() + { + if (getQuerySpec().getQuery().getContext().containsKey(PlannerContext.CTX_LOOKUPS_TO_LOAD)) { + List lookupsToLoad = (List) getQuerySpec().getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); + } else { + return LookupLoadingSpec.NONE; + } + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index b4d18ea390e9..1405cd3b570a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -37,9 +37,13 @@ import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerImpl; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nonnull; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -185,4 +189,15 @@ public int hashCode() { return Objects.hash(super.hashCode(), controllerTaskId, workerNumber, retryCount, worker); } + + @Override + public LookupLoadingSpec getLookupLoadingSpec() + { + if (getContext().containsKey(PlannerContext.CTX_LOOKUPS_TO_LOAD)) { + List lookupsToLoad = (List) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); + } else { + return LookupLoadingSpec.NONE; + } + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index e0eee251f728..2d6d9d1202d9 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -20,6 +20,8 @@ package org.apache.druid.msq.indexing; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -34,12 +36,15 @@ import org.apache.druid.query.Druids; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -84,6 +89,50 @@ public void testGetInputSourceResources() Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty()); } + @Test + public void testGetDefaultLookupLoadingSpec() + { + MSQControllerTask controllerTask = new MSQControllerTask( + null, + MSQ_SPEC, + null, + null, + null, + null, + null, + null + ); + Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); + } + + @Test + public void testGetLookupLoadingSpecUsingContext() + { + MSQSpec build = MSQSpec + .builder() + .query(new Druids.ScanQueryBuilder() + .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) + .dataSource("target") + .context(ImmutableMap.of(PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"))) + .build() + ) + .columnMappings(new ColumnMappings(Collections.emptyList())) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .build(); + MSQControllerTask controllerTask = new MSQControllerTask( + null, + build, + null, + null, + null, + null, + null, + null + ); + Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, controllerTask.getLookupLoadingSpec().getMode()); + Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), controllerTask.getLookupLoadingSpec().getLookupsToLoad()); + } + @Test public void testGetTaskAllocatorId() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 6eff77184ea7..e620f4baf290 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -20,9 +20,13 @@ package org.apache.druid.msq.indexing; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import org.apache.druid.sql.calcite.planner.PlannerContext; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -108,4 +112,20 @@ public void testGetInputSourceResources() MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty()); } + + @Test + public void testGetDefaultLookupLoadingSpec() + { + MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + Assert.assertEquals(LookupLoadingSpec.NONE, msqWorkerTask.getLookupLoadingSpec()); + } + + @Test + public void testGetLookupLoadingSpecUsingContext() + { + final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2")); + MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, msqWorkerTask.getLookupLoadingSpec().getMode()); + Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad()); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index fe78b481bee4..0dbe81543c8b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -119,6 +119,7 @@ import org.apache.druid.msq.util.SqlStatementResourceHelper; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ForwardingQueryProcessingPool; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -807,12 +808,34 @@ private MSQErrorReport getErrorReportOrThrow(String controllerTaskId) private void assertMSQSpec(MSQSpec expectedMSQSpec, MSQSpec querySpecForTask) { - Assert.assertEquals(expectedMSQSpec.getQuery(), querySpecForTask.getQuery()); + assertMSQSpecQuery(expectedMSQSpec.getQuery(), querySpecForTask.getQuery()); Assert.assertEquals(expectedMSQSpec.getAssignmentStrategy(), querySpecForTask.getAssignmentStrategy()); Assert.assertEquals(expectedMSQSpec.getColumnMappings(), querySpecForTask.getColumnMappings()); Assert.assertEquals(expectedMSQSpec.getDestination(), querySpecForTask.getDestination()); } + private void assertMSQSpecQuery(Query msqSpecQuery, Query taskSpecQuery) + { + Assert.assertEquals(msqSpecQuery.getId(), taskSpecQuery.getId()); + Assert.assertEquals(msqSpecQuery.getType(), taskSpecQuery.getType()); + Assert.assertEquals(msqSpecQuery.getSubQueryId(), taskSpecQuery.getSubQueryId()); + Assert.assertEquals(msqSpecQuery.getSqlQueryId(), taskSpecQuery.getSqlQueryId()); + Assert.assertEquals(msqSpecQuery.getIntervals(), taskSpecQuery.getIntervals()); + Assert.assertEquals(msqSpecQuery.getDataSource(), taskSpecQuery.getDataSource()); + Assert.assertEquals(msqSpecQuery.getFilter(), taskSpecQuery.getFilter()); + Assert.assertEquals(msqSpecQuery.getDuration(), taskSpecQuery.getDuration()); + Assert.assertEquals(msqSpecQuery.getGranularity(), taskSpecQuery.getGranularity()); + Assert.assertEquals(msqSpecQuery.getTimezone(), taskSpecQuery.getTimezone()); + Assert.assertEquals(msqSpecQuery.getRequiredColumns(), taskSpecQuery.getRequiredColumns()); + Assert.assertEquals(msqSpecQuery.getVirtualColumns(), taskSpecQuery.getVirtualColumns()); + + // taskSpecQuery's context should have all key-value pairs from msqSpecQuery's context. + Map msqSpecQueryContext = msqSpecQuery.getContext(); + for (Map.Entry entry : msqSpecQueryContext.entrySet()) { + Assert.assertEquals(msqSpecQueryContext.get(entry.getKey()), taskSpecQuery.getContext().get(entry.getKey())); + } + } + private void assertTuningConfig( MSQTuningConfig expectedTuningConfig, MSQTuningConfig tuningConfig diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java index 9075456c812d..ca489c833573 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java @@ -37,7 +37,9 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rule.ReverseLookupRule; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class QueryLookupOperatorConversion implements SqlOperatorConversion { @@ -81,12 +83,18 @@ public DruidExpression toDruidExpression( final DruidExpression arg = inputExpressions.get(0); final Expr lookupNameExpr = plannerContext.parseExpression(inputExpressions.get(1).getExpression()); final String replaceMissingValueWith = getReplaceMissingValueWith(inputExpressions, plannerContext); + String lookupName = (String) lookupNameExpr.getLiteralValue(); + + // Put the lookup names in the query context to facilitate selective loading of lookups. + plannerContext.queryContextMap().putIfAbsent(PlannerContext.CTX_LOOKUPS_TO_LOAD, new HashSet<>()); + Set lookupsToLoad = (Set) plannerContext.queryContextMap().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + lookupsToLoad.add(lookupName); if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) { return arg.getSimpleExtraction().cascade( new RegisteredLookupExtractionFn( lookupExtractorFactoryContainerProvider, - (String) lookupNameExpr.getLiteralValue(), + lookupName, false, replaceMissingValueWith, null, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 281fc66c8aaa..948056bc113c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -78,6 +78,7 @@ public class PlannerContext public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp"; public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone"; public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm"; + public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad"; private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM = JoinAlgorithm.BROADCAST; /** diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java index 631936972e10..b426de6f3ec6 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java @@ -33,6 +33,7 @@ import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import org.apache.druid.sql.calcite.expression.AuthorizableOperator; +import org.apache.druid.sql.calcite.schema.NamedLookupSchema; import java.util.HashSet; import java.util.List; @@ -87,13 +88,21 @@ public SqlNode visit(SqlIdentifier id) if (qualifiedNameParts.size() == 2) { final String schema = qualifiedNameParts.get(0); final String resourceName = qualifiedNameParts.get(1); + + // Put the lookup names in the query context to facilitate selective loading of lookups. + if (schema.equals(NamedLookupSchema.NAME)) { + plannerContext.queryContextMap().putIfAbsent(PlannerContext.CTX_LOOKUPS_TO_LOAD, new HashSet<>()); + Set lookupsToLoad = (Set) plannerContext.queryContextMap().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + lookupsToLoad.add(resourceName); + } + final String resourceType = plannerContext.getSchemaResourceType(schema, resourceName); if (resourceType != null) { resourceActions.add(new ResourceAction(new Resource(resourceName, resourceType), Action.READ)); } } else if (qualifiedNameParts.size() > 2) { // Don't expect to see more than 2 names (catalog?). - throw new ISE("Cannot analyze table idetifier %s", qualifiedNameParts); + throw new ISE("Cannot analyze table identifier %s", qualifiedNameParts); } } } From 6fb096047863b24beee612f87c8ed61c4c3dd2d9 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 2 May 2024 13:52:43 +0530 Subject: [PATCH 02/14] Rebase with master and resolve conflicts --- .../druid/msq/indexing/IndexerControllerContext.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 3ff71c3e1b77..e8bbbbdcd5df 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -55,6 +55,7 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.server.DruidNode; +import org.apache.druid.sql.calcite.planner.PlannerContext; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -269,6 +270,14 @@ public static Map makeTaskContext( .put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec)) .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()); + // Put the lookup names in the query context to facilitate selective loading of lookups. + if (querySpec.getQuery().getContext().containsKey(PlannerContext.CTX_LOOKUPS_TO_LOAD) && querySpec.getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) { + taskContextOverridesBuilder.put( + PlannerContext.CTX_LOOKUPS_TO_LOAD, + querySpec.getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD) + ); + } + if (querySpec.getDestination().toSelectDestination() != null) { taskContextOverridesBuilder.put( MultiStageQueryContext.CTX_SELECT_DESTINATION, From d767034d2f7fe590979637aedcf9a53e54537971 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 2 May 2024 15:10:55 +0530 Subject: [PATCH 03/14] Fix checkstyle --- .../src/main/java/org/apache/druid/msq/exec/ControllerImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 73c65c4ae9f3..db7c6838ba7e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -195,7 +195,6 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.DruidNode; import org.apache.druid.sql.calcite.planner.ColumnMappings; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.storage.ExportStorageProvider; From 582f90ce52887c2b76fa206aa0d5d7a8660539a7 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 2 May 2024 17:52:30 +0530 Subject: [PATCH 04/14] Address review comments --- .../indexing/IndexerControllerContext.java | 2 +- .../druid/msq/indexing/MSQControllerTask.java | 4 +-- .../druid/msq/indexing/MSQWorkerTask.java | 4 +-- .../msq/indexing/MSQControllerTaskTest.java | 29 ++++++++++++++++++- .../druid/msq/indexing/MSQWorkerTaskTest.java | 11 ++++++- .../QueryLookupOperatorConversion.java | 2 +- .../planner/SqlResourceCollectorShuttle.java | 5 +++- 7 files changed, 48 insertions(+), 9 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index e8bbbbdcd5df..c2450bea47bb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -271,7 +271,7 @@ public static Map makeTaskContext( .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()); // Put the lookup names in the query context to facilitate selective loading of lookups. - if (querySpec.getQuery().getContext().containsKey(PlannerContext.CTX_LOOKUPS_TO_LOAD) && querySpec.getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) { + if (querySpec.getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) { taskContextOverridesBuilder.put( PlannerContext.CTX_LOOKUPS_TO_LOAD, querySpec.getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 3465fee9d402..63c7f06b2dee 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -342,9 +342,9 @@ public LookupLoadingSpec getLookupLoadingSpec() { if (getQuerySpec().getQuery().getContext().containsKey(PlannerContext.CTX_LOOKUPS_TO_LOAD)) { List lookupsToLoad = (List) getQuerySpec().getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); + return lookupsToLoad.isEmpty() ? LookupLoadingSpec.NONE : LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); } else { - return LookupLoadingSpec.NONE; + return LookupLoadingSpec.ALL; } } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index 1405cd3b570a..ff5aeffa429e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -195,9 +195,9 @@ public LookupLoadingSpec getLookupLoadingSpec() { if (getContext().containsKey(PlannerContext.CTX_LOOKUPS_TO_LOAD)) { List lookupsToLoad = (List) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); + return lookupsToLoad.isEmpty() ? LookupLoadingSpec.NONE : LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); } else { - return LookupLoadingSpec.NONE; + return LookupLoadingSpec.ALL; } } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index 2d6d9d1202d9..f460d0b051e8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -102,11 +102,38 @@ public void testGetDefaultLookupLoadingSpec() null, null ); + Assert.assertEquals(LookupLoadingSpec.ALL, controllerTask.getLookupLoadingSpec()); + } + + @Test + public void testGetLookupLoadingSpecUsingEmptyListInContext() + { + MSQSpec build = MSQSpec + .builder() + .query(new Druids.ScanQueryBuilder() + .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) + .dataSource("target") + .context(ImmutableMap.of(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList())) + .build() + ) + .columnMappings(new ColumnMappings(Collections.emptyList())) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .build(); + MSQControllerTask controllerTask = new MSQControllerTask( + null, + build, + null, + null, + null, + null, + null, + null + ); Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); } @Test - public void testGetLookupLoadingSpecUsingContext() + public void testGetLookupLoadingSpecUsingNonEmptyListInContext() { MSQSpec build = MSQSpec .builder() diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index e620f4baf290..d8ad50ed2c84 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -116,12 +117,20 @@ public void testGetInputSourceResources() @Test public void testGetDefaultLookupLoadingSpec() { + MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + Assert.assertEquals(LookupLoadingSpec.ALL, msqWorkerTask.getLookupLoadingSpec()); + } + + @Test + public void testGetLookupLoadingSpecUsingEmptyListInContext() + { + final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertEquals(LookupLoadingSpec.NONE, msqWorkerTask.getLookupLoadingSpec()); } @Test - public void testGetLookupLoadingSpecUsingContext() + public void testGetLookupLoadingSpecUsingNonEmptyListInContext() { final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2")); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java index ca489c833573..5fa6bc017d56 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java @@ -83,7 +83,7 @@ public DruidExpression toDruidExpression( final DruidExpression arg = inputExpressions.get(0); final Expr lookupNameExpr = plannerContext.parseExpression(inputExpressions.get(1).getExpression()); final String replaceMissingValueWith = getReplaceMissingValueWith(inputExpressions, plannerContext); - String lookupName = (String) lookupNameExpr.getLiteralValue(); + final String lookupName = (String) lookupNameExpr.getLiteralValue(); // Put the lookup names in the query context to facilitate selective loading of lookups. plannerContext.queryContextMap().putIfAbsent(PlannerContext.CTX_LOOKUPS_TO_LOAD, new HashSet<>()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java index b426de6f3ec6..f33f08f6827c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java @@ -79,6 +79,10 @@ public SqlNode visit(SqlIdentifier id) // raw tables and views and such will have a IdentifierNamespace // since we are scoped to identifiers here, we should only pick up these SqlValidatorNamespace namespace = validator.getNamespace(id); + + // Put an empty set to facilitate loading no lookups by default. + plannerContext.queryContextMap().putIfAbsent(PlannerContext.CTX_LOOKUPS_TO_LOAD, new HashSet<>()); + if (namespace != null && namespace.isWrapperFor(IdentifierNamespace.class)) { SqlValidatorTable validatorTable = namespace.getTable(); // this should not probably be null if the namespace was not null, @@ -91,7 +95,6 @@ public SqlNode visit(SqlIdentifier id) // Put the lookup names in the query context to facilitate selective loading of lookups. if (schema.equals(NamedLookupSchema.NAME)) { - plannerContext.queryContextMap().putIfAbsent(PlannerContext.CTX_LOOKUPS_TO_LOAD, new HashSet<>()); Set lookupsToLoad = (Set) plannerContext.queryContextMap().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); lookupsToLoad.add(resourceName); } From ccaeb6463459eaea9dfa3f77cd506bfb89aa8675 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 2 May 2024 18:09:32 +0530 Subject: [PATCH 05/14] Address review comment: Use computeIfAbsent --- .../expression/builtin/QueryLookupOperatorConversion.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java index 5fa6bc017d56..594137105280 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java @@ -86,8 +86,8 @@ public DruidExpression toDruidExpression( final String lookupName = (String) lookupNameExpr.getLiteralValue(); // Put the lookup names in the query context to facilitate selective loading of lookups. - plannerContext.queryContextMap().putIfAbsent(PlannerContext.CTX_LOOKUPS_TO_LOAD, new HashSet<>()); - Set lookupsToLoad = (Set) plannerContext.queryContextMap().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + Set lookupsToLoad = (Set) plannerContext.queryContextMap() + .computeIfAbsent(PlannerContext.CTX_LOOKUPS_TO_LOAD, key -> new HashSet<>()); lookupsToLoad.add(lookupName); if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) { From 21917cbb9312653e543941ea34b070b70bae8069 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 3 May 2024 13:48:19 +0530 Subject: [PATCH 06/14] Migrate to using new field in planner context instead of queryContext --- .../indexing/IndexerControllerContext.java | 12 ++++-- .../druid/msq/indexing/MSQControllerTask.java | 10 +---- .../druid/msq/indexing/MSQWorkerTask.java | 11 ++++- .../druid/msq/sql/MSQTaskQueryMaker.java | 13 +++++- .../msq/indexing/MSQControllerTaskTest.java | 41 ++++--------------- .../druid/msq/indexing/MSQWorkerTaskTest.java | 12 +++--- .../QueryLookupOperatorConversion.java | 8 +--- .../sql/calcite/planner/PlannerContext.java | 12 ++++++ .../planner/SqlResourceCollectorShuttle.java | 9 +--- 9 files changed, 62 insertions(+), 66 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index c2450bea47bb..e8fba09ddc51 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -270,11 +270,17 @@ public static Map makeTaskContext( .put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec)) .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()); - // Put the lookup names in the query context to facilitate selective loading of lookups. - if (querySpec.getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) { + // Put the lookup loading info in the task context to facilitate selective loading of lookups. + if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) != null) { + taskContextOverridesBuilder.put( + PlannerContext.CTX_LOOKUP_LOADING_MODE, + controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) + ); + } + if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) { taskContextOverridesBuilder.put( PlannerContext.CTX_LOOKUPS_TO_LOAD, - querySpec.getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD) + controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 63c7f06b2dee..aeef49503483 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -61,13 +61,11 @@ import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.run.SqlResults; import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -141,7 +139,6 @@ public MSQControllerTask( this.sqlResultsContext = sqlResultsContext; this.sqlTypeNames = sqlTypeNames; this.nativeTypeNames = nativeTypeNames; - addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); } @@ -340,11 +337,6 @@ public static boolean writeResultsToDurableStorage(final MSQSpec querySpec) @Override public LookupLoadingSpec getLookupLoadingSpec() { - if (getQuerySpec().getQuery().getContext().containsKey(PlannerContext.CTX_LOOKUPS_TO_LOAD)) { - List lookupsToLoad = (List) getQuerySpec().getQuery().getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - return lookupsToLoad.isEmpty() ? LookupLoadingSpec.NONE : LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); - } else { - return LookupLoadingSpec.ALL; - } + return LookupLoadingSpec.NONE; } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index ff5aeffa429e..d0944a2cd21b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -193,9 +193,16 @@ public int hashCode() @Override public LookupLoadingSpec getLookupLoadingSpec() { - if (getContext().containsKey(PlannerContext.CTX_LOOKUPS_TO_LOAD)) { + if (!getContext().containsKey(PlannerContext.CTX_LOOKUP_LOADING_MODE) || getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE) == null) { + return LookupLoadingSpec.ALL; + } + + String lookupLoadingMode = getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString(); + if (lookupLoadingMode.equals(LookupLoadingSpec.Mode.NONE.toString())) { + return LookupLoadingSpec.NONE; + } else if (lookupLoadingMode.equals(LookupLoadingSpec.Mode.ONLY_REQUIRED.toString())) { List lookupsToLoad = (List) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - return lookupsToLoad.isEmpty() ? LookupLoadingSpec.NONE : LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); + return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); } else { return LookupLoadingSpec.ALL; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 8cc34547f990..d295f445516c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -52,6 +52,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.QueryResponse; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; @@ -283,6 +284,16 @@ public QueryResponse runQuery(final DruidQuery druidQuery) MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec); + Map context = new HashMap<>(); + if (plannerContext.getLookupsToLoad() != null) { + if (plannerContext.getLookupsToLoad().isEmpty()) { + context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); + } else { + context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupsToLoad()); + } + } + final MSQControllerTask controllerTask = new MSQControllerTask( taskId, querySpec.withOverriddenContext(nativeQueryContext), @@ -291,7 +302,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) SqlResults.Context.fromPlannerContext(plannerContext), sqlTypeNames, columnTypeList, - null + context ); FutureUtils.getUnchecked(overlordClient.runTask(taskId, controllerTask), true); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index f460d0b051e8..a5001fb58ebd 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -102,45 +101,22 @@ public void testGetDefaultLookupLoadingSpec() null, null ); - Assert.assertEquals(LookupLoadingSpec.ALL, controllerTask.getLookupLoadingSpec()); - } - - @Test - public void testGetLookupLoadingSpecUsingEmptyListInContext() - { - MSQSpec build = MSQSpec - .builder() - .query(new Druids.ScanQueryBuilder() - .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) - .dataSource("target") - .context(ImmutableMap.of(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList())) - .build() - ) - .columnMappings(new ColumnMappings(Collections.emptyList())) - .tuningConfig(MSQTuningConfig.defaultConfig()) - .build(); - MSQControllerTask controllerTask = new MSQControllerTask( - null, - build, - null, - null, - null, - null, - null, - null - ); Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); } @Test - public void testGetLookupLoadingSpecUsingNonEmptyListInContext() + public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() { MSQSpec build = MSQSpec .builder() .query(new Druids.ScanQueryBuilder() .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) .dataSource("target") - .context(ImmutableMap.of(PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"))) + .context( + ImmutableMap.of( + PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED) + ) .build() ) .columnMappings(new ColumnMappings(Collections.emptyList())) @@ -156,8 +132,9 @@ public void testGetLookupLoadingSpecUsingNonEmptyListInContext() null, null ); - Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, controllerTask.getLookupLoadingSpec().getMode()); - Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), controllerTask.getLookupLoadingSpec().getLookupsToLoad()); + + // Va;idate that MSQ Controller task doesn't load any lookups even if context has lookup info populated. + Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); } @Test diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index d8ad50ed2c84..6d00ce539feb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -27,7 +27,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -52,7 +51,6 @@ public class MSQWorkerTaskTest @Test public void testEquals() { - Assert.assertEquals(msqWorkerTask, msqWorkerTask); Assert.assertEquals( msqWorkerTask, new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount) @@ -122,17 +120,19 @@ public void testGetDefaultLookupLoadingSpec() } @Test - public void testGetLookupLoadingSpecUsingEmptyListInContext() + public void testGetLookupLoadingSpecUsingLookupLoadingModeNoneInContext() { - final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); + final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertEquals(LookupLoadingSpec.NONE, msqWorkerTask.getLookupLoadingSpec()); } @Test - public void testGetLookupLoadingSpecUsingNonEmptyListInContext() + public void testGetLookupLoadingSpecUsingNonEmptyLookupListInContext() { - final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2")); + final ImmutableMap context = ImmutableMap.of( + PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, msqWorkerTask.getLookupLoadingSpec().getMode()); Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java index 594137105280..3cd8697a2ddf 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java @@ -37,9 +37,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rule.ReverseLookupRule; -import java.util.HashSet; import java.util.List; -import java.util.Set; public class QueryLookupOperatorConversion implements SqlOperatorConversion { @@ -85,10 +83,8 @@ public DruidExpression toDruidExpression( final String replaceMissingValueWith = getReplaceMissingValueWith(inputExpressions, plannerContext); final String lookupName = (String) lookupNameExpr.getLiteralValue(); - // Put the lookup names in the query context to facilitate selective loading of lookups. - Set lookupsToLoad = (Set) plannerContext.queryContextMap() - .computeIfAbsent(PlannerContext.CTX_LOOKUPS_TO_LOAD, key -> new HashSet<>()); - lookupsToLoad.add(lookupName); + // Collect the lookup names to facilitate selective loading of lookups. + plannerContext.getLookupsToLoad().add(lookupName); if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) { return arg.getSimpleExtraction().cascade( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 948056bc113c..99e657ab7023 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -61,6 +61,7 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -78,6 +79,7 @@ public class PlannerContext public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp"; public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone"; public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm"; + public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode"; public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad"; private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM = JoinAlgorithm.BROADCAST; @@ -143,6 +145,8 @@ public class PlannerContext // set of attributes for a SQL statement used in the EXPLAIN PLAN output private ExplainAttributes explainAttributes; private PlannerLookupCache lookupCache; + // set of lookups to load for a given task + private final Set lookupsToLoad = new HashSet<>(); private PlannerContext( final PlannerToolbox plannerToolbox, @@ -344,6 +348,14 @@ public String getSchemaResourceType(String schema, String resourceName) return plannerToolbox.rootSchema().getResourceType(schema, resourceName); } + /** + * Returns the set of lookups to laod for a given task. + */ + public Set getLookupsToLoad() + { + return lookupsToLoad; + } + /** * Return the query context as a mutable map. Use this form when * modifying the context during planning. diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java index f33f08f6827c..bc91bece6fbf 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java @@ -79,10 +79,6 @@ public SqlNode visit(SqlIdentifier id) // raw tables and views and such will have a IdentifierNamespace // since we are scoped to identifiers here, we should only pick up these SqlValidatorNamespace namespace = validator.getNamespace(id); - - // Put an empty set to facilitate loading no lookups by default. - plannerContext.queryContextMap().putIfAbsent(PlannerContext.CTX_LOOKUPS_TO_LOAD, new HashSet<>()); - if (namespace != null && namespace.isWrapperFor(IdentifierNamespace.class)) { SqlValidatorTable validatorTable = namespace.getTable(); // this should not probably be null if the namespace was not null, @@ -93,10 +89,9 @@ public SqlNode visit(SqlIdentifier id) final String schema = qualifiedNameParts.get(0); final String resourceName = qualifiedNameParts.get(1); - // Put the lookup names in the query context to facilitate selective loading of lookups. + // Collect the lookup names to facilitate selective loading of lookups. if (schema.equals(NamedLookupSchema.NAME)) { - Set lookupsToLoad = (Set) plannerContext.queryContextMap().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - lookupsToLoad.add(resourceName); + plannerContext.getLookupsToLoad().add(resourceName); } final String resourceType = plannerContext.getSchemaResourceType(schema, resourceName); From 801070fe5a368850a0445817fb8f9a5d130207d5 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 3 May 2024 16:23:09 +0530 Subject: [PATCH 07/14] Update MSQ tests to verify the lookup loading info populated in the task context --- .../apache/druid/msq/exec/MSQSelectTest.java | 8 +++++- .../apache/druid/msq/test/MSQTestBase.java | 27 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 56f1ce986965..84dddd526c1d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.JsonInputFormat; @@ -70,6 +71,7 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.external.ExternalDataSource; import org.apache.druid.sql.calcite.filtration.Filtration; @@ -227,7 +229,9 @@ public void testSelectOnFoo(String contextName, Map context) new Object[]{1L, "1"}, new Object[]{1L, "def"}, new Object[]{1L, "abc"} - )).verifyResults(); + )) + .setExpectedLookupLoadingSpec(LookupLoadingSpec.NONE) + .verifyResults(); } @MethodSource("data") @@ -742,6 +746,7 @@ public void testSelectLookup(String contextName, Map context) .build()) .setExpectedRowSignature(rowSignature) .setExpectedResultRows(ImmutableList.of(new Object[]{4L})) + .setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo"))) .verifyResults(); } @@ -808,6 +813,7 @@ public void testJoinWithLookup(String contextName, Map context) new Object[]{"xabc", 1L} ) ) + .setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo"))) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 0dbe81543c8b..391b825d05e0 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -154,6 +154,7 @@ import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.sql.DirectStatement; @@ -876,6 +877,7 @@ public abstract class MSQTester> protected CompactionState expectedLastCompactionState = null; protected Set expectedTombstoneIntervals = null; protected List expectedResultRows = null; + protected LookupLoadingSpec expectedLookupLoadingSpec = null; protected Matcher expectedValidationErrorMatcher = null; protected List, String>> adhocReportAssertionAndReasons = new ArrayList<>(); protected Matcher expectedExecutionErrorMatcher = null; @@ -940,6 +942,12 @@ public Builder setExpectedResultRows(List expectedResultRows) return asBuilder(); } + public Builder setExpectedLookupLoadingSpec(LookupLoadingSpec lookupLoadingSpec) + { + this.expectedLookupLoadingSpec = lookupLoadingSpec; + return asBuilder(); + } + public Builder setExpectedMSQSpec(MSQSpec expectedMSQSpec) { this.expectedMSQSpec = expectedMSQSpec; @@ -1033,6 +1041,23 @@ public void verifyPlanningErrors() assertThat(e, expectedValidationErrorMatcher); } + protected void verifyLookupLoadingInfoInTaskContext(Map context) + { + String lookupLoadingMode = context.get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString(); + List lookupsToLoad = (List) context.get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + if (expectedLookupLoadingSpec != null) { + Assert.assertEquals(expectedLookupLoadingSpec.getMode().toString(), lookupLoadingMode); + if (expectedLookupLoadingSpec.getMode().equals(LookupLoadingSpec.Mode.ONLY_REQUIRED)) { + Assert.assertEquals(new ArrayList<>(expectedLookupLoadingSpec.getLookupsToLoad()), lookupsToLoad); + } else { + Assert.assertNull(lookupsToLoad); + } + } else { + Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), lookupLoadingMode); + Assert.assertNull(lookupsToLoad); + } + } + protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree) { Map> counterMap = counterSnapshotsTree.copyMap(); @@ -1189,6 +1214,7 @@ public void verifyResults() verifyCounters(reportPayload.getCounters()); MSQSpec foundSpec = indexingServiceClient.getMSQControllerTask(controllerId).getQuerySpec(); + verifyLookupLoadingInfoInTaskContext(indexingServiceClient.getMSQControllerTask(controllerId).getContext()); log.info( "found generated segments: %s", segmentManager.getAllDataSegments().stream().map(s -> s.toString()).collect( @@ -1416,6 +1442,7 @@ public Pair, List>> throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString()); } else { MSQControllerTask msqControllerTask = indexingServiceClient.getMSQControllerTask(controllerId); + verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext()); final MSQSpec spec = msqControllerTask.getQuerySpec(); final List rows; From 26ce2090199ce275684ebe080f018e14eac45483 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 3 May 2024 16:27:18 +0530 Subject: [PATCH 08/14] Fix typo --- .../org/apache/druid/sql/calcite/planner/PlannerContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 99e657ab7023..ef86ee9fee65 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -349,7 +349,7 @@ public String getSchemaResourceType(String schema, String resourceName) } /** - * Returns the set of lookups to laod for a given task. + * Returns the set of lookups to load for a given task. */ public Set getLookupsToLoad() { From e4920f5f82750f671648349616709514e3724a8f Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Mon, 6 May 2024 12:19:14 +0530 Subject: [PATCH 09/14] Address review comments --- .../druid/msq/indexing/MSQControllerTask.java | 1 + .../druid/msq/indexing/MSQWorkerTask.java | 13 +++++++--- .../druid/msq/sql/MSQTaskQueryMaker.java | 13 +++------- .../druid/msq/indexing/MSQWorkerTaskTest.java | 19 ++++++++++++++ .../QueryLookupOperatorConversion.java | 4 +-- .../sql/calcite/planner/PlannerContext.java | 25 +++++++++++++++---- .../planner/SqlResourceCollectorShuttle.java | 4 +-- 7 files changed, 56 insertions(+), 23 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index aeef49503483..bdaf3964b299 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -139,6 +139,7 @@ public MSQControllerTask( this.sqlResultsContext = sqlResultsContext; this.sqlTypeNames = sqlTypeNames; this.nativeTypeNames = nativeTypeNames; + addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index d0944a2cd21b..3aec41911602 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -193,15 +194,19 @@ public int hashCode() @Override public LookupLoadingSpec getLookupLoadingSpec() { - if (!getContext().containsKey(PlannerContext.CTX_LOOKUP_LOADING_MODE) || getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE) == null) { + final Object lookupModeValue = getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE); + if (lookupModeValue == null) { return LookupLoadingSpec.ALL; } - String lookupLoadingMode = getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString(); - if (lookupLoadingMode.equals(LookupLoadingSpec.Mode.NONE.toString())) { + final LookupLoadingSpec.Mode lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); + if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) { return LookupLoadingSpec.NONE; - } else if (lookupLoadingMode.equals(LookupLoadingSpec.Mode.ONLY_REQUIRED.toString())) { + } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { List lookupsToLoad = (List) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + if (lookupsToLoad == null) { + throw InvalidInput.exception("Set of lookups to load cannot be NULL for mode = ONLY_REQUIRED."); + } return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); } else { return LookupLoadingSpec.ALL; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index d295f445516c..67b62b5ab8cd 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -52,7 +52,6 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.QueryResponse; -import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; @@ -284,15 +283,9 @@ public QueryResponse runQuery(final DruidQuery druidQuery) MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec); - Map context = new HashMap<>(); - if (plannerContext.getLookupsToLoad() != null) { - if (plannerContext.getLookupsToLoad().isEmpty()) { - context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); - } else { - context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupsToLoad()); - } - } + final Map context = new HashMap<>(); + context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode()); + context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); final MSQControllerTask controllerTask = new MSQControllerTask( taskId, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 6d00ce539feb..6d3771e770d5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -21,12 +21,14 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.DruidException; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.junit.Assert; import org.junit.Test; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -137,4 +139,21 @@ public void testGetLookupLoadingSpecUsingNonEmptyLookupListInContext() Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, msqWorkerTask.getLookupLoadingSpec().getMode()); Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad()); } + + @Test + public void testGetLookupLoadingSpecUsingInvalidInput() + { + final HashMap context = new HashMap() {{ + put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); + put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + }}; + MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + DruidException exception = Assert.assertThrows( + DruidException.class, + msqWorkerTask::getLookupLoadingSpec + ); + Assert.assertEquals( + "Set of lookups to load cannot be NULL for mode = ONLY_REQUIRED.", + exception.getMessage()); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java index 3cd8697a2ddf..8947bd60a01f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java @@ -83,8 +83,8 @@ public DruidExpression toDruidExpression( final String replaceMissingValueWith = getReplaceMissingValueWith(inputExpressions, plannerContext); final String lookupName = (String) lookupNameExpr.getLiteralValue(); - // Collect the lookup names to facilitate selective loading of lookups. - plannerContext.getLookupsToLoad().add(lookupName); + // Add the lookup name to the set of lookups to selectively load. + plannerContext.addLookupToLoad(lookupName); if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) { return arg.getSimpleExtraction().cascade( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index ef86ee9fee65..89d6553886fe 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -43,6 +43,7 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.lookup.RegisteredLookupExtractionFn; import org.apache.druid.segment.join.JoinableFactoryWrapper; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.ResourceAction; @@ -145,8 +146,8 @@ public class PlannerContext // set of attributes for a SQL statement used in the EXPLAIN PLAN output private ExplainAttributes explainAttributes; private PlannerLookupCache lookupCache; - // set of lookups to load for a given task - private final Set lookupsToLoad = new HashSet<>(); + // Lookup loading spec for a given task + private LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.NONE; private PlannerContext( final PlannerToolbox plannerToolbox, @@ -349,11 +350,25 @@ public String getSchemaResourceType(String schema, String resourceName) } /** - * Returns the set of lookups to load for a given task. + * Add a lookup name to load in the lookup loading spec. */ - public Set getLookupsToLoad() + public void addLookupToLoad(String lookupName) { - return lookupsToLoad; + if (lookupLoadingSpec.getLookupsToLoad() == null) { + lookupLoadingSpec = LookupLoadingSpec.loadOnly(Collections.singleton(lookupName)); + } else { + Set existingLookupsToLoad = new HashSet<>(lookupLoadingSpec.getLookupsToLoad()); + existingLookupsToLoad.add(lookupName); + lookupLoadingSpec = LookupLoadingSpec.loadOnly(existingLookupsToLoad); + } + } + + /** + * Returns the lookup loading spec for a given task. + */ + public LookupLoadingSpec getLookupLoadingSpec() + { + return lookupLoadingSpec; } /** diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java index bc91bece6fbf..4300c7d574b7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java @@ -89,9 +89,9 @@ public SqlNode visit(SqlIdentifier id) final String schema = qualifiedNameParts.get(0); final String resourceName = qualifiedNameParts.get(1); - // Collect the lookup names to facilitate selective loading of lookups. + // Add the lookup name to the set of lookups to selectively load. if (schema.equals(NamedLookupSchema.NAME)) { - plannerContext.getLookupsToLoad().add(resourceName); + plannerContext.addLookupToLoad(resourceName); } final String resourceType = plannerContext.getSchemaResourceType(schema, resourceName); From 191eb4265711c4fcc8bc21a3bec40cf9c94896c0 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Mon, 6 May 2024 13:16:37 +0530 Subject: [PATCH 10/14] Fix test for jdk17 check --- .../org/apache/druid/msq/indexing/MSQWorkerTaskTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 6d3771e770d5..a01b7b6ee3eb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -143,10 +143,9 @@ public void testGetLookupLoadingSpecUsingNonEmptyLookupListInContext() @Test public void testGetLookupLoadingSpecUsingInvalidInput() { - final HashMap context = new HashMap() {{ - put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); - put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); - }}; + final HashMap context = new HashMap<>(); + context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); + context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); DruidException exception = Assert.assertThrows( DruidException.class, From bb4a093311298e03aed3704f37fb3870dbbbc7fb Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 7 May 2024 13:54:18 +0530 Subject: [PATCH 11/14] Address review comments, update logic for adding lookup names to PlannerContext's LookupLoadingSpec --- .../druid/msq/indexing/MSQWorkerTask.java | 2 +- .../druid/msq/indexing/MSQWorkerTaskTest.java | 2 +- .../lookup/cache/LookupLoadingSpec.java | 34 +++++++++++++++++-- .../lookup/cache/LookupLoadingSpecTest.java | 28 +++++++++++++++ .../QueryLookupOperatorConversion.java | 2 +- .../sql/calcite/planner/PlannerContext.java | 19 ++--------- .../planner/SqlResourceCollectorShuttle.java | 2 +- 7 files changed, 65 insertions(+), 24 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index 3aec41911602..e774ea74205e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -205,7 +205,7 @@ public LookupLoadingSpec getLookupLoadingSpec() } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { List lookupsToLoad = (List) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); if (lookupsToLoad == null) { - throw InvalidInput.exception("Set of lookups to load cannot be NULL for mode = ONLY_REQUIRED."); + throw InvalidInput.exception("Set of lookups to load cannot be NULL for mode[ONLY_REQUIRED]."); } return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); } else { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index a01b7b6ee3eb..52e8401e216d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -152,7 +152,7 @@ public void testGetLookupLoadingSpecUsingInvalidInput() msqWorkerTask::getLookupLoadingSpec ); Assert.assertEquals( - "Set of lookups to load cannot be NULL for mode = ONLY_REQUIRED.", + "Set of lookups to load cannot be NULL for mode[ONLY_REQUIRED].", exception.getMessage()); } } diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java index 88524fe27f96..6abfdce6f7e4 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java @@ -22,6 +22,9 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.InvalidInput; +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.HashSet; import java.util.Set; /** @@ -44,8 +47,8 @@ public enum Mode ALL, NONE, ONLY_REQUIRED } - private final Mode mode; - private final ImmutableSet lookupsToLoad; + private Mode mode; + private Set lookupsToLoad; public static final LookupLoadingSpec ALL = new LookupLoadingSpec(Mode.ALL, null); public static final LookupLoadingSpec NONE = new LookupLoadingSpec(Mode.NONE, null); @@ -56,6 +59,17 @@ private LookupLoadingSpec(Mode mode, Set lookupsToLoad) this.lookupsToLoad = lookupsToLoad == null ? null : ImmutableSet.copyOf(lookupsToLoad); } + /** + * @param mode Allowed values are NONE and ALL. Use {@link LookupLoadingSpec#loadOnly} for creating spec for ONLY_REQUIRED mode. + */ + public static LookupLoadingSpec createSpecFromMode(Mode mode) + { + if (mode == Mode.ONLY_REQUIRED) { + throw InvalidInput.exception("Use different method for creating lookup loading spec with mode[ONLY_REQUIRED]."); + } + return new LookupLoadingSpec(mode, null); + } + /** * Creates a LookupLoadingSpec which loads only the lookups present in the given set. */ @@ -72,12 +86,26 @@ public Mode getMode() return mode; } + /** + * Adds the given lookup name to the lookup loading spec, and also updates the mode to ONLY_REQUIRED. + */ + public void addLookupToLoad(String lookupName) + { + mode = Mode.ONLY_REQUIRED; + if (lookupsToLoad == null) { + lookupsToLoad = new HashSet<>(Collections.singletonList(lookupName)); + } else { + lookupsToLoad.add(lookupName); + } + } + /** * @return A non-null immutable set of lookup names when {@link LookupLoadingSpec#mode} is ONLY_REQUIRED, null otherwise. */ + @Nullable public ImmutableSet getLookupsToLoad() { - return lookupsToLoad; + return lookupsToLoad == null ? null : ImmutableSet.copyOf(lookupsToLoad); } @Override diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index 8d0a7a5518a3..bfd6c04bfe7e 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -53,6 +53,34 @@ public void testLoadingOnlyRequiredLookups() Assert.assertEquals(lookupsToLoad, spec.getLookupsToLoad()); } + @Test + public void testCreateSpecFromMode() + { + LookupLoadingSpec noneSpec = LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.NONE); + Assert.assertEquals(LookupLoadingSpec.Mode.NONE, noneSpec.getMode()); + Assert.assertNull(noneSpec.getLookupsToLoad()); + + LookupLoadingSpec allSpec = LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.ALL); + Assert.assertEquals(LookupLoadingSpec.Mode.ALL, allSpec.getMode()); + Assert.assertNull(allSpec.getLookupsToLoad()); + } + + @Test + public void testCreateSpecFromModeInvalidInput() + { + DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.ONLY_REQUIRED)); + Assert.assertEquals("Use different method for creating lookup loading spec with mode[ONLY_REQUIRED].", exception.getMessage()); + } + + @Test + public void testAddingLookupToLoad() + { + LookupLoadingSpec spec = LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.NONE); + spec.addLookupToLoad("lookupName1"); + Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, spec.getMode()); + Assert.assertEquals(ImmutableSet.of("lookupName1"), spec.getLookupsToLoad()); + } + @Test public void testLoadingOnlyRequiredLookupsWithNullList() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java index 8947bd60a01f..ccc2b8500f6b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java @@ -84,7 +84,7 @@ public DruidExpression toDruidExpression( final String lookupName = (String) lookupNameExpr.getLiteralValue(); // Add the lookup name to the set of lookups to selectively load. - plannerContext.addLookupToLoad(lookupName); + plannerContext.getLookupLoadingSpec().addLookupToLoad(lookupName); if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) { return arg.getSimpleExtraction().cascade( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 89d6553886fe..5f2c8a8e5f99 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -62,7 +62,6 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -146,8 +145,7 @@ public class PlannerContext // set of attributes for a SQL statement used in the EXPLAIN PLAN output private ExplainAttributes explainAttributes; private PlannerLookupCache lookupCache; - // Lookup loading spec for a given task - private LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.NONE; + private final LookupLoadingSpec lookupLoadingSpec; private PlannerContext( final PlannerToolbox plannerToolbox, @@ -182,6 +180,7 @@ private PlannerContext( sqlQueryId = UUID.randomUUID().toString(); } this.sqlQueryId = sqlQueryId; + this.lookupLoadingSpec = LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.NONE); } public static PlannerContext create( @@ -349,20 +348,6 @@ public String getSchemaResourceType(String schema, String resourceName) return plannerToolbox.rootSchema().getResourceType(schema, resourceName); } - /** - * Add a lookup name to load in the lookup loading spec. - */ - public void addLookupToLoad(String lookupName) - { - if (lookupLoadingSpec.getLookupsToLoad() == null) { - lookupLoadingSpec = LookupLoadingSpec.loadOnly(Collections.singleton(lookupName)); - } else { - Set existingLookupsToLoad = new HashSet<>(lookupLoadingSpec.getLookupsToLoad()); - existingLookupsToLoad.add(lookupName); - lookupLoadingSpec = LookupLoadingSpec.loadOnly(existingLookupsToLoad); - } - } - /** * Returns the lookup loading spec for a given task. */ diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java index 4300c7d574b7..d21af24537ae 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java @@ -91,7 +91,7 @@ public SqlNode visit(SqlIdentifier id) // Add the lookup name to the set of lookups to selectively load. if (schema.equals(NamedLookupSchema.NAME)) { - plannerContext.addLookupToLoad(resourceName); + plannerContext.getLookupLoadingSpec().addLookupToLoad(resourceName); } final String resourceType = plannerContext.getSchemaResourceType(schema, resourceName); From 40e55aeb168bed93b92842a53013d90248cc68f3 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 7 May 2024 18:13:52 +0530 Subject: [PATCH 12/14] Use Set in PlannerContext instead of LookupLoadingSpec --- .../lookup/cache/LookupLoadingSpec.java | 34 ++----------------- .../lookup/cache/LookupLoadingSpecTest.java | 28 --------------- .../QueryLookupOperatorConversion.java | 2 +- .../sql/calcite/planner/PlannerContext.java | 16 ++++++--- .../planner/SqlResourceCollectorShuttle.java | 2 +- 5 files changed, 17 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java index 6abfdce6f7e4..88524fe27f96 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java @@ -22,9 +22,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.InvalidInput; -import javax.annotation.Nullable; -import java.util.Collections; -import java.util.HashSet; import java.util.Set; /** @@ -47,8 +44,8 @@ public enum Mode ALL, NONE, ONLY_REQUIRED } - private Mode mode; - private Set lookupsToLoad; + private final Mode mode; + private final ImmutableSet lookupsToLoad; public static final LookupLoadingSpec ALL = new LookupLoadingSpec(Mode.ALL, null); public static final LookupLoadingSpec NONE = new LookupLoadingSpec(Mode.NONE, null); @@ -59,17 +56,6 @@ private LookupLoadingSpec(Mode mode, Set lookupsToLoad) this.lookupsToLoad = lookupsToLoad == null ? null : ImmutableSet.copyOf(lookupsToLoad); } - /** - * @param mode Allowed values are NONE and ALL. Use {@link LookupLoadingSpec#loadOnly} for creating spec for ONLY_REQUIRED mode. - */ - public static LookupLoadingSpec createSpecFromMode(Mode mode) - { - if (mode == Mode.ONLY_REQUIRED) { - throw InvalidInput.exception("Use different method for creating lookup loading spec with mode[ONLY_REQUIRED]."); - } - return new LookupLoadingSpec(mode, null); - } - /** * Creates a LookupLoadingSpec which loads only the lookups present in the given set. */ @@ -86,26 +72,12 @@ public Mode getMode() return mode; } - /** - * Adds the given lookup name to the lookup loading spec, and also updates the mode to ONLY_REQUIRED. - */ - public void addLookupToLoad(String lookupName) - { - mode = Mode.ONLY_REQUIRED; - if (lookupsToLoad == null) { - lookupsToLoad = new HashSet<>(Collections.singletonList(lookupName)); - } else { - lookupsToLoad.add(lookupName); - } - } - /** * @return A non-null immutable set of lookup names when {@link LookupLoadingSpec#mode} is ONLY_REQUIRED, null otherwise. */ - @Nullable public ImmutableSet getLookupsToLoad() { - return lookupsToLoad == null ? null : ImmutableSet.copyOf(lookupsToLoad); + return lookupsToLoad; } @Override diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index bfd6c04bfe7e..8d0a7a5518a3 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -53,34 +53,6 @@ public void testLoadingOnlyRequiredLookups() Assert.assertEquals(lookupsToLoad, spec.getLookupsToLoad()); } - @Test - public void testCreateSpecFromMode() - { - LookupLoadingSpec noneSpec = LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.NONE); - Assert.assertEquals(LookupLoadingSpec.Mode.NONE, noneSpec.getMode()); - Assert.assertNull(noneSpec.getLookupsToLoad()); - - LookupLoadingSpec allSpec = LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.ALL); - Assert.assertEquals(LookupLoadingSpec.Mode.ALL, allSpec.getMode()); - Assert.assertNull(allSpec.getLookupsToLoad()); - } - - @Test - public void testCreateSpecFromModeInvalidInput() - { - DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.ONLY_REQUIRED)); - Assert.assertEquals("Use different method for creating lookup loading spec with mode[ONLY_REQUIRED].", exception.getMessage()); - } - - @Test - public void testAddingLookupToLoad() - { - LookupLoadingSpec spec = LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.NONE); - spec.addLookupToLoad("lookupName1"); - Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, spec.getMode()); - Assert.assertEquals(ImmutableSet.of("lookupName1"), spec.getLookupsToLoad()); - } - @Test public void testLoadingOnlyRequiredLookupsWithNullList() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java index ccc2b8500f6b..8947bd60a01f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java @@ -84,7 +84,7 @@ public DruidExpression toDruidExpression( final String lookupName = (String) lookupNameExpr.getLiteralValue(); // Add the lookup name to the set of lookups to selectively load. - plannerContext.getLookupLoadingSpec().addLookupToLoad(lookupName); + plannerContext.addLookupToLoad(lookupName); if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) { return arg.getSimpleExtraction().cascade( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 5f2c8a8e5f99..99f721bffaa7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -62,6 +62,7 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -145,7 +146,7 @@ public class PlannerContext // set of attributes for a SQL statement used in the EXPLAIN PLAN output private ExplainAttributes explainAttributes; private PlannerLookupCache lookupCache; - private final LookupLoadingSpec lookupLoadingSpec; + private final Set lookupsToLoad = new HashSet<>(); private PlannerContext( final PlannerToolbox plannerToolbox, @@ -180,7 +181,6 @@ private PlannerContext( sqlQueryId = UUID.randomUUID().toString(); } this.sqlQueryId = sqlQueryId; - this.lookupLoadingSpec = LookupLoadingSpec.createSpecFromMode(LookupLoadingSpec.Mode.NONE); } public static PlannerContext create( @@ -349,11 +349,19 @@ public String getSchemaResourceType(String schema, String resourceName) } /** - * Returns the lookup loading spec for a given task. + * Adds the given lookup name to the lookup loading spec. + */ + public void addLookupToLoad(String lookupName) + { + lookupsToLoad.add(lookupName); + } + + /** + * Returns the lookup to load for a given task. */ public LookupLoadingSpec getLookupLoadingSpec() { - return lookupLoadingSpec; + return lookupsToLoad.isEmpty() ? LookupLoadingSpec.NONE : LookupLoadingSpec.loadOnly(lookupsToLoad); } /** diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java index d21af24537ae..4300c7d574b7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java @@ -91,7 +91,7 @@ public SqlNode visit(SqlIdentifier id) // Add the lookup name to the set of lookups to selectively load. if (schema.equals(NamedLookupSchema.NAME)) { - plannerContext.getLookupLoadingSpec().addLookupToLoad(resourceName); + plannerContext.addLookupToLoad(resourceName); } final String resourceType = plannerContext.getSchemaResourceType(schema, resourceName); From ff8e098db8343e4b449b5ef323e2c5e93f07b37c Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 8 May 2024 09:18:50 +0530 Subject: [PATCH 13/14] Address review comments --- .../druid/msq/indexing/MSQWorkerTask.java | 4 ++-- .../druid/msq/sql/MSQTaskQueryMaker.java | 5 ++++- .../druid/msq/indexing/MSQWorkerTaskTest.java | 20 +++++++++++++++++-- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index e774ea74205e..49c0f7989c2f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -204,8 +204,8 @@ public LookupLoadingSpec getLookupLoadingSpec() return LookupLoadingSpec.NONE; } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { List lookupsToLoad = (List) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - if (lookupsToLoad == null) { - throw InvalidInput.exception("Set of lookups to load cannot be NULL for mode[ONLY_REQUIRED]."); + if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { + throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); } return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 67b62b5ab8cd..533010c30575 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -52,6 +52,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.QueryResponse; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; @@ -285,7 +286,9 @@ public QueryResponse runQuery(final DruidQuery druidQuery) final Map context = new HashMap<>(); context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode()); - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); + if (plannerContext.getLookupLoadingSpec().getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED) { + context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); + } final MSQControllerTask controllerTask = new MSQControllerTask( taskId, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 52e8401e216d..601c06ec9fbf 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -28,6 +28,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -144,15 +145,30 @@ public void testGetLookupLoadingSpecUsingNonEmptyLookupListInContext() public void testGetLookupLoadingSpecUsingInvalidInput() { final HashMap context = new HashMap<>(); - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + + // Setting CTX_LOOKUPS_TO_LOAD as null + context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); + MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); DruidException exception = Assert.assertThrows( DruidException.class, msqWorkerTask::getLookupLoadingSpec ); Assert.assertEquals( - "Set of lookups to load cannot be NULL for mode[ONLY_REQUIRED].", + "Set of lookups to load cannot be null for mode[ONLY_REQUIRED].", + exception.getMessage()); + + // Setting CTX_LOOKUPS_TO_LOAD as empty list + context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); + + msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + exception = Assert.assertThrows( + DruidException.class, + msqWorkerTask::getLookupLoadingSpec + ); + Assert.assertEquals( + "Set of lookups to load cannot be [] for mode[ONLY_REQUIRED].", exception.getMessage()); } } From 2aaace00a2938b4e5c341f565e973f1f5a994597 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 9 May 2024 08:38:37 +0530 Subject: [PATCH 14/14] Address review comments --- .../druid/msq/indexing/MSQWorkerTask.java | 4 +-- .../druid/msq/indexing/MSQWorkerTaskTest.java | 14 ++++----- .../apache/druid/msq/test/MSQTestBase.java | 30 +++---------------- 3 files changed, 13 insertions(+), 35 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index 49c0f7989c2f..a23c62881a00 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -43,8 +43,8 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nonnull; +import java.util.Collection; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -203,7 +203,7 @@ public LookupLoadingSpec getLookupLoadingSpec() if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) { return LookupLoadingSpec.NONE; } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { - List lookupsToLoad = (List) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); + Collection lookupsToLoad = (Collection) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 601c06ec9fbf..5e79b129f3bd 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -123,7 +123,7 @@ public void testGetDefaultLookupLoadingSpec() } @Test - public void testGetLookupLoadingSpecUsingLookupLoadingModeNoneInContext() + public void testGetLookupLoadingWithModeNoneInContext() { final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); @@ -131,7 +131,7 @@ public void testGetLookupLoadingSpecUsingLookupLoadingModeNoneInContext() } @Test - public void testGetLookupLoadingSpecUsingNonEmptyLookupListInContext() + public void testGetLookupLoadingSpecWithLookupListInContext() { final ImmutableMap context = ImmutableMap.of( PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), @@ -142,7 +142,7 @@ public void testGetLookupLoadingSpecUsingNonEmptyLookupListInContext() } @Test - public void testGetLookupLoadingSpecUsingInvalidInput() + public void testGetLookupLoadingSpecWithInvalidInput() { final HashMap context = new HashMap<>(); context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); @@ -150,10 +150,10 @@ public void testGetLookupLoadingSpecUsingInvalidInput() // Setting CTX_LOOKUPS_TO_LOAD as null context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); - MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); DruidException exception = Assert.assertThrows( DruidException.class, - msqWorkerTask::getLookupLoadingSpec + taskWithNullLookups::getLookupLoadingSpec ); Assert.assertEquals( "Set of lookups to load cannot be null for mode[ONLY_REQUIRED].", @@ -162,10 +162,10 @@ public void testGetLookupLoadingSpecUsingInvalidInput() // Setting CTX_LOOKUPS_TO_LOAD as empty list context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); - msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); exception = Assert.assertThrows( DruidException.class, - msqWorkerTask::getLookupLoadingSpec + taskWithEmptyLookups::getLookupLoadingSpec ); Assert.assertEquals( "Set of lookups to load cannot be [] for mode[ONLY_REQUIRED].", diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 391b825d05e0..cdaafc75c60e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -119,7 +119,6 @@ import org.apache.druid.msq.util.SqlStatementResourceHelper; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ForwardingQueryProcessingPool; -import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -809,34 +808,12 @@ private MSQErrorReport getErrorReportOrThrow(String controllerTaskId) private void assertMSQSpec(MSQSpec expectedMSQSpec, MSQSpec querySpecForTask) { - assertMSQSpecQuery(expectedMSQSpec.getQuery(), querySpecForTask.getQuery()); + Assert.assertEquals(expectedMSQSpec.getQuery(), querySpecForTask.getQuery()); Assert.assertEquals(expectedMSQSpec.getAssignmentStrategy(), querySpecForTask.getAssignmentStrategy()); Assert.assertEquals(expectedMSQSpec.getColumnMappings(), querySpecForTask.getColumnMappings()); Assert.assertEquals(expectedMSQSpec.getDestination(), querySpecForTask.getDestination()); } - private void assertMSQSpecQuery(Query msqSpecQuery, Query taskSpecQuery) - { - Assert.assertEquals(msqSpecQuery.getId(), taskSpecQuery.getId()); - Assert.assertEquals(msqSpecQuery.getType(), taskSpecQuery.getType()); - Assert.assertEquals(msqSpecQuery.getSubQueryId(), taskSpecQuery.getSubQueryId()); - Assert.assertEquals(msqSpecQuery.getSqlQueryId(), taskSpecQuery.getSqlQueryId()); - Assert.assertEquals(msqSpecQuery.getIntervals(), taskSpecQuery.getIntervals()); - Assert.assertEquals(msqSpecQuery.getDataSource(), taskSpecQuery.getDataSource()); - Assert.assertEquals(msqSpecQuery.getFilter(), taskSpecQuery.getFilter()); - Assert.assertEquals(msqSpecQuery.getDuration(), taskSpecQuery.getDuration()); - Assert.assertEquals(msqSpecQuery.getGranularity(), taskSpecQuery.getGranularity()); - Assert.assertEquals(msqSpecQuery.getTimezone(), taskSpecQuery.getTimezone()); - Assert.assertEquals(msqSpecQuery.getRequiredColumns(), taskSpecQuery.getRequiredColumns()); - Assert.assertEquals(msqSpecQuery.getVirtualColumns(), taskSpecQuery.getVirtualColumns()); - - // taskSpecQuery's context should have all key-value pairs from msqSpecQuery's context. - Map msqSpecQueryContext = msqSpecQuery.getContext(); - for (Map.Entry entry : msqSpecQueryContext.entrySet()) { - Assert.assertEquals(msqSpecQueryContext.get(entry.getKey()), taskSpecQuery.getContext().get(entry.getKey())); - } - } - private void assertTuningConfig( MSQTuningConfig expectedTuningConfig, MSQTuningConfig tuningConfig @@ -1213,8 +1190,9 @@ public void verifyResults() verifyWorkerCount(reportPayload.getCounters()); verifyCounters(reportPayload.getCounters()); - MSQSpec foundSpec = indexingServiceClient.getMSQControllerTask(controllerId).getQuerySpec(); - verifyLookupLoadingInfoInTaskContext(indexingServiceClient.getMSQControllerTask(controllerId).getContext()); + MSQControllerTask msqControllerTask = indexingServiceClient.getMSQControllerTask(controllerId); + MSQSpec foundSpec = msqControllerTask.getQuerySpec(); + verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext()); log.info( "found generated segments: %s", segmentManager.getAllDataSegments().stream().map(s -> s.toString()).collect(