From 08c9ec2b738cd3d52fbf7fc93d4c198fd97fd22c Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Mon, 9 Sep 2024 10:01:28 -0400 Subject: [PATCH 01/10] Initial support for selective loading of broadcast datasources. --- .../indexing/common/task/CompactionTask.java | 6 +- .../common/task/KillUnusedSegmentsTask.java | 7 + .../druid/indexing/common/task/Task.java | 6 + .../common/task/CompactionTaskTest.java | 29 +++ .../task/KillUnusedSegmentsTaskTest.java | 11 ++ .../coordination/BroadcastLoadingSpec.java | 166 ++++++++++++++++++ .../coordination/SegmentBootstrapper.java | 35 +++- .../metrics/DataSourceTaskIdHolder.java | 15 +- .../BroadcastLoadingSpecTest.java | 166 ++++++++++++++++++ .../SegmentBootstrapperCacheTest.java | 9 +- .../coordination/SegmentBootstrapperTest.java | 156 +++++++++++++++- .../java/org/apache/druid/cli/CliPeon.java | 9 + 12 files changed, 599 insertions(+), 16 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/coordination/BroadcastLoadingSpec.java create mode 100644 server/src/test/java/org/apache/druid/server/coordination/BroadcastLoadingSpecTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 73c8a35405c4..eebf55869522 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -89,6 +89,7 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.server.coordination.BroadcastLoadingSpec; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; @@ -240,12 +241,13 @@ public CompactionTask( : compactionRunner; this.currentSubTaskHolder = this.compactionRunner.getCurrentSubTaskHolder(); - // Do not load any lookups in sub-tasks launched by compaction task, unless transformSpec is present. + // Do not load any lookups and broadcast segments in sub-tasks launched by compaction task, unless transformSpec is present. // If transformSpec is present, we will not modify the context so that the sub-tasks can make the - // decision based on context values, loading all lookups by default. + // decision based on context values, loading all lookups and broadcast segments by default. // This is done to ensure backward compatibility since transformSpec can reference lookups. if (transformSpec == null) { addToContextIfAbsent(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + addToContextIfAbsent(BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.NONE.toString()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index e1f6d2915eea..7c50ea13b2cd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordination.BroadcastLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; @@ -412,6 +413,12 @@ public LookupLoadingSpec getLookupLoadingSpec() return LookupLoadingSpec.NONE; } + @Override + public BroadcastLoadingSpec getBroadcastDatasourcesLoadingSpec() + { + return BroadcastLoadingSpec.NONE; + } + @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 9b882e2e8d2b..867b882c4428 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.server.coordination.BroadcastLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; @@ -332,4 +333,9 @@ default LookupLoadingSpec getLookupLoadingSpec() { return LookupLoadingSpec.createFromContext(getContext(), LookupLoadingSpec.ALL); } + + default BroadcastLoadingSpec getBroadcastDatasourcesLoadingSpec() + { + return BroadcastLoadingSpec.createFromContext(getContext(), BroadcastLoadingSpec.ALL); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c1bf649980f6..9d1cc341bb59 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -129,6 +129,7 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.coordination.BroadcastLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthorizerMapper; @@ -1606,6 +1607,34 @@ public void testGetDefaultLookupLoadingSpecWithTransformSpec() Assert.assertEquals(LookupLoadingSpec.ALL, task.getLookupLoadingSpec()); } + @Test + public void testGetDefaultBroadcastLoadingSpec() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory + ); + final CompactionTask task = builder + .interval(Intervals.of("2000-01-01/2000-01-02")) + .build(); + Assert.assertEquals(BroadcastLoadingSpec.NONE, task.getBroadcastDatasourcesLoadingSpec()); + } + + @Test + public void testGetDefaultBroadcastLoadingSpecWithTransformSpec() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory + ); + final CompactionTask task = builder + .interval(Intervals.of("2000-01-01/2000-01-02")) + .transformSpec(new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null))) + .build(); + Assert.assertEquals(BroadcastLoadingSpec.ALL, task.getBroadcastDatasourcesLoadingSpec()); + } + + private Granularity chooseFinestGranularityHelper(List granularities) { SettableSupplier queryGranularity = new SettableSupplier<>(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index fe2b5a51c86a..b42565f72e68 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase; +import org.apache.druid.server.coordination.BroadcastLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.timeline.DataSegment; import org.assertj.core.api.Assertions; @@ -601,6 +602,16 @@ public void testGetLookupsToLoad() Assert.assertEquals(LookupLoadingSpec.Mode.NONE, task.getLookupLoadingSpec().getMode()); } + @Test + public void testGetBroadcastDatasourcesToLoad() + { + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2019-03-01/2019-04-01")) + .build(); + Assert.assertEquals(BroadcastLoadingSpec.Mode.NONE, task.getBroadcastDatasourcesLoadingSpec().getMode()); + } + @Test public void testKillBatchSizeOneAndLimit4() throws Exception { diff --git a/server/src/main/java/org/apache/druid/server/coordination/BroadcastLoadingSpec.java b/server/src/main/java/org/apache/druid/server/coordination/BroadcastLoadingSpec.java new file mode 100644 index 000000000000..7f4adbf4db7b --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordination/BroadcastLoadingSpec.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.InvalidInput; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * This class defines the spec for loading of broadcastDatasources for a given task. It contains 2 fields: + *
    + *
  1. {@link BroadcastLoadingSpec#mode}: This mode defines whether broadcastDatasources need to be + * loaded for the given task, or not. It can take 3 values:
  2. + *
      + *
    • ALL: Load all the broadcastDatasources.
    • + *
    • NONE: Load no broadcastDatasources.
    • + *
    • ONLY_REQUIRED: Load only the broadcastDatasources defined in broadcastDatasourcesToLoad
    • + *
    + *
  3. {@link BroadcastLoadingSpec#broadcastDatasourcesToLoad}: Defines the broadcastDatasources to load when the broadcastDatasourceLoadingMode is set to ONLY_REQUIRED.
  4. + *
+ */ +public class BroadcastLoadingSpec +{ + + public static final String CTX_BROADCAST_DATASOURCES_LOADING_MODE = "broadcastDatasourceLoadingMode"; + public static final String CTX_BROADCAST_DATASOURCES_TO_LOAD = "broadcastDatasourcesToLoad"; + + public enum Mode + { + ALL, NONE, ONLY_REQUIRED + } + + private final Mode mode; + @Nullable + private final ImmutableSet broadcastDatasourcesToLoad; + + public static final BroadcastLoadingSpec ALL = new BroadcastLoadingSpec(Mode.ALL, null); + public static final BroadcastLoadingSpec NONE = new BroadcastLoadingSpec(Mode.NONE, null); + + private BroadcastLoadingSpec(Mode mode, @Nullable Set broadcastDatasourcesToLoad) + { + this.mode = mode; + this.broadcastDatasourcesToLoad = broadcastDatasourcesToLoad == null ? null : ImmutableSet.copyOf(broadcastDatasourcesToLoad); + } + + /** + * Creates a broadcastDatasourceLoadingSpec which loads only the broadcastDatasources present in the given set. + */ + public static BroadcastLoadingSpec loadOnly(Set broadcastDatasourcesToLoad) + { + if (broadcastDatasourcesToLoad == null) { + throw InvalidInput.exception("Expected non-null set of broadcastDatasources to load."); + } + return new BroadcastLoadingSpec(Mode.ONLY_REQUIRED, broadcastDatasourcesToLoad); + } + + public Mode getMode() + { + return mode; + } + + /** + * @return A non-null immutable set of broadcastDatasource names when {@link BroadcastLoadingSpec#mode} is ONLY_REQUIRED, null otherwise. + */ + public ImmutableSet getbroadcastDatasourcesToLoad() + { + return broadcastDatasourcesToLoad; + } + + public static BroadcastLoadingSpec createFromContext(Map context, BroadcastLoadingSpec defaultSpec) + { + if (context == null) { + return defaultSpec; + } + + final Object broadcastDatasourceModeValue = context.get(CTX_BROADCAST_DATASOURCES_LOADING_MODE); + if (broadcastDatasourceModeValue == null) { + return defaultSpec; + } + + final BroadcastLoadingSpec.Mode broadcastDatasourceLoadingMode; + try { + broadcastDatasourceLoadingMode = BroadcastLoadingSpec.Mode.valueOf(broadcastDatasourceModeValue.toString()); + } + catch (IllegalArgumentException e) { + throw InvalidInput.exception("Invalid value of %s[%s]. Allowed values are %s", + CTX_BROADCAST_DATASOURCES_LOADING_MODE, broadcastDatasourceModeValue.toString(), Arrays.asList( + BroadcastLoadingSpec.Mode.values())); + } + + if (broadcastDatasourceLoadingMode == Mode.NONE) { + return NONE; + } else if (broadcastDatasourceLoadingMode == Mode.ALL) { + return ALL; + } else if (broadcastDatasourceLoadingMode == Mode.ONLY_REQUIRED) { + Collection broadcastDatasourcesToLoad; + try { + broadcastDatasourcesToLoad = (Collection) context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD); + } + catch (ClassCastException e) { + throw InvalidInput.exception("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "broadcastDatasource names. For example: [\"broadcastDatasourceName1\", \"broadcastDatasourceName2\"]", + CTX_BROADCAST_DATASOURCES_TO_LOAD, context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD)); + } + + if (broadcastDatasourcesToLoad == null || broadcastDatasourcesToLoad.isEmpty()) { + throw InvalidInput.exception("Set of broadcastDatasources to load cannot be %s for mode[ONLY_REQUIRED].", broadcastDatasourcesToLoad); + } + return BroadcastLoadingSpec.loadOnly(new HashSet<>(broadcastDatasourcesToLoad)); + } else { + return defaultSpec; + } + } + + @Override + public String toString() + { + return "broadcastDatasourceLoadingSpec{" + + "mode=" + mode + + ", broadcastDatasourcesToLoad=" + broadcastDatasourcesToLoad + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BroadcastLoadingSpec that = (BroadcastLoadingSpec) o; + return mode == that.mode && Objects.equals(broadcastDatasourcesToLoad, that.broadcastDatasourcesToLoad); + } + + @Override + public int hashCode() + { + return Objects.hash(mode, broadcastDatasourcesToLoad); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java index c5b71fbcddcf..ffeef623a082 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java @@ -21,11 +21,13 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import org.apache.druid.client.BootstrapSegmentsResponse; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.java.util.common.ISE; @@ -39,12 +41,14 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -55,6 +59,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * Responsible for bootstrapping segments already cached on disk and bootstrap segments fetched from the coordinator. @@ -80,6 +85,8 @@ public class SegmentBootstrapper private static final EmittingLogger log = new EmittingLogger(SegmentBootstrapper.class); + private final DataSourceTaskIdHolder datasourceHolder; + @Inject public SegmentBootstrapper( SegmentLoadDropHandler loadDropHandler, @@ -89,7 +96,8 @@ public SegmentBootstrapper( SegmentManager segmentManager, ServerTypeConfig serverTypeConfig, CoordinatorClient coordinatorClient, - ServiceEmitter emitter + ServiceEmitter emitter, + DataSourceTaskIdHolder datasourceHolder ) { this.loadDropHandler = loadDropHandler; @@ -100,6 +108,8 @@ public SegmentBootstrapper( this.serverTypeConfig = serverTypeConfig; this.coordinatorClient = coordinatorClient; this.emitter = emitter; + this.datasourceHolder = datasourceHolder; + log.info("Datsource holder broadcastLoadingSpec:[%s] and lookupLoadingSpec:[%s]", datasourceHolder.getBroadcastLoadingSpec(), datasourceHolder.getLookupLoadingSpec()); } @LifecycleStart @@ -260,10 +270,16 @@ private void loadSegmentsOnStartup() throws IOException } /** - * @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned. + * @return a list of bootstrap segments based on {@link #datasourceHolder#getBroadcastLoadingSpec()}. When bootstrap segments cannot be found, an empty list is returned. */ private List getBootstrapSegments() { + final BroadcastLoadingSpec.Mode mode = datasourceHolder.getBroadcastLoadingSpec().getMode(); + if (mode == BroadcastLoadingSpec.Mode.NONE) { + log.info("NONE"); + return ImmutableList.of(); // null? + } + log.info("Fetching bootstrap segments from the coordinator."); final Stopwatch stopwatch = Stopwatch.createStarted(); @@ -272,7 +288,19 @@ private List getBootstrapSegments() try { final BootstrapSegmentsResponse response = FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true); - bootstrapSegments = ImmutableList.copyOf(response.getIterator()); + if (mode == BroadcastLoadingSpec.Mode.ONLY_REQUIRED) { + final Set broadcastDatasourcesToLoad = datasourceHolder.getBroadcastLoadingSpec().getbroadcastDatasourcesToLoad(); + final List filteredBroadcast = new ArrayList<>(); + response.getIterator().forEachRemaining(segment -> { + if (broadcastDatasourcesToLoad.contains(segment.getDataSource())) { + filteredBroadcast.add(segment); + } + }); + bootstrapSegments = filteredBroadcast; + log.info("GRRRR shrunk size[%d]", bootstrapSegments.size()); + } else { + bootstrapSegments = ImmutableList.copyOf(response.getIterator()); + } } catch (Exception e) { log.warn("Error fetching bootstrap segments from the coordinator: [%s]. ", e.getMessage()); @@ -284,7 +312,6 @@ private List getBootstrapSegments() emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", bootstrapSegments.size())); log.info("Fetched [%d] bootstrap segments in [%d]ms.", bootstrapSegments.size(), fetchRunMillis); } - return bootstrapSegments; } diff --git a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java index 6d2dafd31a55..bb698ce46f0b 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java +++ b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java @@ -21,15 +21,16 @@ import com.google.inject.Inject; import com.google.inject.name.Named; +import org.apache.druid.server.coordination.BroadcastLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import javax.annotation.Nullable; - public class DataSourceTaskIdHolder { public static final String DATA_SOURCE_BINDING = "druidDataSource"; public static final String TASK_ID_BINDING = "druidTaskId"; public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask"; + public static final String BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK = "broadcastDatasourcesToLoadForTask"; + @Named(DATA_SOURCE_BINDING) @Inject(optional = true) String dataSource = null; @@ -37,11 +38,14 @@ public class DataSourceTaskIdHolder @Inject(optional = true) String taskId = null; - @Nullable @Named(LOOKUPS_TO_LOAD_FOR_TASK) @Inject(optional = true) LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL; + @Named(BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) + @Inject(optional = true) + BroadcastLoadingSpec broadcastLoadingSpec = BroadcastLoadingSpec.ALL; + public String getDataSource() { return dataSource; @@ -56,4 +60,9 @@ public LookupLoadingSpec getLookupLoadingSpec() { return lookupLoadingSpec; } + + public BroadcastLoadingSpec getBroadcastLoadingSpec() + { + return broadcastLoadingSpec; + } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/BroadcastLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/coordination/BroadcastLoadingSpecTest.java new file mode 100644 index 000000000000..b52e33432805 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordination/BroadcastLoadingSpecTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Set; + +@RunWith(JUnitParamsRunner.class) +public class BroadcastLoadingSpecTest +{ + @Test + public void testLoadingAllBroadcastDatasources() + { + final BroadcastLoadingSpec spec = BroadcastLoadingSpec.ALL; + Assert.assertEquals(BroadcastLoadingSpec.Mode.ALL, spec.getMode()); + Assert.assertNull(spec.getbroadcastDatasourcesToLoad()); + } + + @Test + public void testLoadingNoLookups() + { + final BroadcastLoadingSpec spec = BroadcastLoadingSpec.NONE; + Assert.assertEquals(BroadcastLoadingSpec.Mode.NONE, spec.getMode()); + Assert.assertNull(spec.getbroadcastDatasourcesToLoad()); + } + + @Test + public void testLoadingOnlyRequiredLookups() + { + final Set broadcastDatasourcesToLoad = ImmutableSet.of("ds1", "ds2"); + final BroadcastLoadingSpec spec = BroadcastLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")); + Assert.assertEquals(BroadcastLoadingSpec.Mode.ONLY_REQUIRED, spec.getMode()); + Assert.assertEquals(broadcastDatasourcesToLoad, spec.getbroadcastDatasourcesToLoad()); + } + + @Test + public void testLoadingOnlyRequiredLookupsWithNullList() + { + DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastLoadingSpec.loadOnly(null)); + Assert.assertEquals("Expected non-null set of broadcastDatasources to load.", exception.getMessage()); + } + + @Test + public void testCreateBroadcastLoadingSpecFromNullContext() + { + // Default spec is returned in the case of context=null. + Assert.assertEquals( + BroadcastLoadingSpec.NONE, + BroadcastLoadingSpec.createFromContext( + null, + BroadcastLoadingSpec.NONE + ) + ); + + Assert.assertEquals( + BroadcastLoadingSpec.ALL, + BroadcastLoadingSpec.createFromContext( + null, + BroadcastLoadingSpec.ALL + ) + ); + } + + @Test + public void testCreateBroadcastLoadingSpecFromContext() + { + // Only required lookups are returned in the case of context having the lookup keys. + Assert.assertEquals( + BroadcastLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")), + BroadcastLoadingSpec.createFromContext( + ImmutableMap.of( + BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, Arrays.asList("ds1", "ds2"), + BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.ONLY_REQUIRED + ), + BroadcastLoadingSpec.ALL + ) + ); + + // No lookups are returned in the case of context having mode=NONE, irrespective of the default spec. + Assert.assertEquals( + BroadcastLoadingSpec.NONE, + BroadcastLoadingSpec.createFromContext( + ImmutableMap.of( + BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.NONE), + BroadcastLoadingSpec.ALL + ) + ); + + // All lookups are returned in the case of context having mode=ALL, irrespective of the default spec. + Assert.assertEquals( + BroadcastLoadingSpec.ALL, + BroadcastLoadingSpec.createFromContext( + ImmutableMap.of(BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.ALL), + BroadcastLoadingSpec.NONE + ) + ); + } + + @Test + @Parameters( + { + "NONE1", + "A", + "Random mode", + "all", + "only required", + "none" + } + ) + public void testCreateLookupLoadingSpecFromInvalidModeInContext(final String mode) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastLoadingSpec.createFromContext( + ImmutableMap.of(BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, mode), BroadcastLoadingSpec.ALL)); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", + BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, mode), exception.getMessage()); + } + + + @Test + @Parameters( + { + "foo bar", + "foo]" + } + ) + public void testCreateLookupLoadingSpecFromInvalidLookupsInContext(Object lookupsToLoad) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> + BroadcastLoadingSpec.createFromContext( + ImmutableMap.of( + BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad, + BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.ONLY_REQUIRED), + BroadcastLoadingSpec.ALL) + ); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "broadcastDatasource names. For example: [\"lookupName1\", \"lookupName2\"]", + BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad), exception.getMessage()); + } +} \ No newline at end of file diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java index 7629a6b875c8..2c98a44ffc50 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java @@ -137,7 +137,8 @@ public void testLoadStartStopWithEmptyLocations() throws IOException segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - emitter + emitter, + null ); bootstrapper.start(); @@ -164,7 +165,8 @@ public void testLoadStartStop() throws IOException segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - emitter + emitter, + null ); bootstrapper.start(); @@ -204,7 +206,8 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - emitter + emitter, + null ); bootstrapper.start(); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java index c41763f18245..6df9d47400ae 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java @@ -20,13 +20,28 @@ package org.apache.druid.server.coordination; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; +import com.google.inject.name.Names; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.initialization.ServerInjectorBuilder; +import org.apache.druid.jackson.JacksonModule; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.druid.server.metrics.MetricsModule; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -34,11 +49,15 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.validation.Validation; +import javax.validation.Validator; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.Set; import static org.apache.druid.server.TestSegmentUtils.makeSegment; @@ -125,7 +144,8 @@ public void testStartStop() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -184,7 +204,8 @@ public void testLoadCachedSegments() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -240,7 +261,8 @@ public void testLoadBootstrapSegments() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -267,6 +289,131 @@ public void testLoadBootstrapSegments() throws Exception bootstrapper.stop(); } + @Test + public void testLoadNoBootstrapSegments() throws Exception + { + final Set segments = new HashSet<>(); + for (int i = 0; i < COUNT; ++i) { + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02"))); + } + + Injector injector = Guice.createInjector( + new JacksonModule(), + new LifecycleModule(), + binder -> { + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + final BroadcastLoadingSpec broadcastMode = BroadcastLoadingSpec.NONE; + binder.bind(Key.get(BroadcastLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + .toInstance(broadcastMode); + } + ); + + final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments); + final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(); + final SegmentManager segmentManager = new SegmentManager(cacheManager); + final SegmentLoadDropHandler handler = new SegmentLoadDropHandler( + segmentLoaderConfig, + segmentAnnouncer, + segmentManager + ); + final SegmentBootstrapper bootstrapper = new SegmentBootstrapper( + handler, + segmentLoaderConfig, + segmentAnnouncer, + serverAnnouncer, + segmentManager, + new ServerTypeConfig(ServerType.HISTORICAL), + coordinatorClient, + serviceEmitter, + injector.getInstance(DataSourceTaskIdHolder.class) + ); + + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + bootstrapper.start(); + + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + final ImmutableList expectedBootstrapSegments = ImmutableList.of(); + + Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); +// serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); +// serviceEmitter.verifyEmitted("segment/bootstrap/time", 0); + + bootstrapper.stop(); + } + + @Test + public void testLoadOnlyRequiredBootstrapSegments() throws Exception + { + final Set segments = new HashSet<>(); + final DataSegment ds1Segment1 = makeSegment("test1", "1", Intervals.of("P1D/2011-04-01")); + final DataSegment ds1Segment2 = makeSegment("test1", "1", Intervals.of("P1D/2012-04-01")); + final DataSegment ds2Segment1 = makeSegment("test2", "1", Intervals.of("P1d/2011-04-01")); + final DataSegment ds2Segment2 = makeSegment("test2", "1", Intervals.of("P1d/2012-04-01")); + segments.add(ds1Segment1); + segments.add(ds1Segment2); + segments.add(ds2Segment1); + segments.add(ds2Segment2); + + Injector injector = Guice.createInjector( + new JacksonModule(), + new LifecycleModule(), + binder -> { + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + final BroadcastLoadingSpec broadcastMode = BroadcastLoadingSpec.loadOnly(ImmutableSet.of("test1")); + binder.bind(Key.get(BroadcastLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + .toInstance(broadcastMode); + } + ); + + final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments); + final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(); + final SegmentManager segmentManager = new SegmentManager(cacheManager); + final SegmentLoadDropHandler handler = new SegmentLoadDropHandler( + segmentLoaderConfig, + segmentAnnouncer, + segmentManager + ); + final SegmentBootstrapper bootstrapper = new SegmentBootstrapper( + handler, + segmentLoaderConfig, + segmentAnnouncer, + serverAnnouncer, + segmentManager, + new ServerTypeConfig(ServerType.HISTORICAL), + coordinatorClient, + serviceEmitter, + injector.getInstance(DataSourceTaskIdHolder.class) + ); + + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + bootstrapper.start(); + + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); + Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty()); + Assert.assertEquals(ImmutableSet.of("test1"), segmentManager.getDataSourceNames()); + + final ImmutableList expectedBootstrapSegments = ImmutableList.of(ds1Segment2, ds1Segment1); + + Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); + serviceEmitter.verifyEmitted("segment/bootstrap/time", 1); + + bootstrapper.stop(); + } + @Test public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception { @@ -285,7 +432,8 @@ public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index eb572850cda2..a4b5ae0f9d1f 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -123,6 +123,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.ResponseContextConfig; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordination.BroadcastLoadingSpec; import org.apache.druid.server.coordination.SegmentBootstrapper; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ZkCoordinator; @@ -340,6 +341,14 @@ public LookupLoadingSpec getLookupsToLoad(final Task task) { return task.getLookupLoadingSpec(); } + + @Provides + @LazySingleton + @Named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) + public BroadcastLoadingSpec getBroadcastDatasourcesToLoad(final Task task) + { + return task.getBroadcastDatasourcesLoadingSpec(); + } }, new QueryablePeonModule(), new IndexingServiceInputSourceModule(), From 0c4c14c0c4d867b8d77c531602557212ec311dd7 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Mon, 9 Sep 2024 12:39:20 -0400 Subject: [PATCH 02/10] WIP deprecation of option and task.supportQueries(). --- .../k8s/overlord/taskadapter/K8sTaskAdapter.java | 7 +++++-- .../taskadapter/PodTemplateTaskAdapter.java | 4 ++++ .../org/apache/druid/indexing/common/task/Task.java | 1 + .../src/main/java/org/apache/druid/cli/CliPeon.java | 13 ++++++++++++- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index c15698803d9f..3bd85576ba45 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -443,13 +443,16 @@ private List generateCommand(Task task) command.add(nodeType); } - // If the task type is queryable, we need to load broadcast segments on the peon, used for - // join queries +// // If the task type is queryable, we need to load broadcast segments on the peon, used for +// // join queries if (task.supportsQueries()) { command.add("--loadBroadcastSegments"); command.add("true"); } + command.add("--loadBroadcastDatasourcesMode"); + command.add(task.getBroadcastDatasourcesLoadingSpec().toString()); + command.add("--taskId"); command.add(task.getId()); log.info( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index e8aaf1bbab35..70a8cf043c98 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -280,6 +280,10 @@ private Collection getEnv(Task task) throws IOException .withName(DruidK8sConstants.TASK_ID_ENV) .withValue(task.getId()) .build(), + new EnvVarBuilder() + .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV) + .withValue(task.getBroadcastDatasourcesLoadingSpec().toString()) + .build(), new EnvVarBuilder() .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV) .withValue(Boolean.toString(task.supportsQueries())) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 867b882c4428..4f0e0eb6cc0b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -183,6 +183,7 @@ default Set getInputSourceResources() throws UOE * MSQWorkerTask returns true from this method (because it embeds a query stack for running multi-stage queries) * even though it is not directly queryable via HTTP. */ + @Deprecated boolean supportsQueries(); /** diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index a4b5ae0f9d1f..212e4a8010da 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -180,9 +180,17 @@ public class CliPeon extends GuiceRunnable * If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for * queryable tasks, such as streaming ingestion tasks. */ + @Deprecated @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments") public String loadBroadcastSegments = "false"; + /** + * Broadcast datasources loading mode. the peon will bind classes necessary for loading broadcast segments. This is used for + * queryable tasks, such as streaming ingestion tasks. + */ + @Option(name = "--loadBroadcastDatasourcesMode", title = "loadBroadcastDatasourcesMode", description = "Enable loading of broadcast datasources") // maybe there is a way to directly provide enum? + public String loadBroadcastDatasourcesMode = BroadcastLoadingSpec.Mode.ALL.toString(); + @Option(name = "--taskId", title = "taskId", description = "TaskId for fetching task.json remotely") public String taskId = ""; @@ -275,7 +283,10 @@ public void configure(Binder binder) binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType))); LifecycleModule.register(binder, Server.class); - if ("true".equals(loadBroadcastSegments)) { + BroadcastLoadingSpec.Mode mode = BroadcastLoadingSpec.Mode.valueOf(loadBroadcastDatasourcesMode); + if ("true".equals(loadBroadcastSegments) + || mode == BroadcastLoadingSpec.Mode.ALL + || mode == BroadcastLoadingSpec.Mode.ONLY_REQUIRED) { binder.install(new BroadcastSegmentLoadingModule()); } } From aa11998400dc425597f00fbf2f09639fb9067745 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 10 Sep 2024 08:37:15 -0400 Subject: [PATCH 03/10] Checkstyle --- .../overlord/taskadapter/K8sTaskAdapter.java | 2 +- .../taskadapter/PodTemplateTaskAdapter.java | 2 +- .../druid/msq/indexing/MSQControllerTask.java | 7 ++ .../msq/indexing/MSQControllerTaskTest.java | 17 ++++ .../indexing/common/task/CompactionTask.java | 7 +- .../common/task/KillUnusedSegmentsTask.java | 6 +- .../druid/indexing/common/task/Task.java | 11 ++- .../common/task/CompactionTaskTest.java | 29 ------- .../task/KillUnusedSegmentsTaskTest.java | 4 +- ...va => BroadcastDatasourceLoadingSpec.java} | 68 ++++++++------- .../coordination/SegmentBootstrapper.java | 20 ++--- .../metrics/DataSourceTaskIdHolder.java | 8 +- ...> BroadcastDatasourceLoadingSpecTest.java} | 84 +++++++++---------- .../coordination/SegmentBootstrapperTest.java | 19 +---- .../java/org/apache/druid/cli/CliPeon.java | 14 ++-- 15 files changed, 143 insertions(+), 155 deletions(-) rename server/src/main/java/org/apache/druid/server/coordination/{BroadcastLoadingSpec.java => BroadcastDatasourceLoadingSpec.java} (52%) rename server/src/test/java/org/apache/druid/server/coordination/{BroadcastLoadingSpecTest.java => BroadcastDatasourceLoadingSpecTest.java} (50%) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index 3bd85576ba45..34913a7675fb 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -451,7 +451,7 @@ private List generateCommand(Task task) } command.add("--loadBroadcastDatasourcesMode"); - command.add(task.getBroadcastDatasourcesLoadingSpec().toString()); + command.add(task.getBroadcastDatasourceLoadingSpec().toString()); command.add("--taskId"); command.add(task.getId()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index 70a8cf043c98..38f2ab781784 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -282,7 +282,7 @@ private Collection getEnv(Task task) throws IOException .build(), new EnvVarBuilder() .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV) - .withValue(task.getBroadcastDatasourcesLoadingSpec().toString()) + .withValue(task.getBroadcastDatasourceLoadingSpec().toString()) .build(), new EnvVarBuilder() .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index c3f6feaab245..4ddc8274b9d0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -59,6 +59,7 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; @@ -374,4 +375,10 @@ public LookupLoadingSpec getLookupLoadingSpec() { return LookupLoadingSpec.NONE; } + + @Override + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() + { + return BroadcastDatasourceLoadingSpec.NONE; + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index 76586c1e1081..8d974285fb57 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -35,6 +35,7 @@ import org.apache.druid.query.Druids; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; @@ -104,6 +105,22 @@ public void testGetDefaultLookupLoadingSpec() Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); } + @Test + public void testGetDefaultBroadcastDatasourceLoadingSpec() + { + MSQControllerTask controllerTask = new MSQControllerTask( + null, + MSQ_SPEC, + null, + null, + null, + null, + null, + null + ); + Assert.assertEquals(BroadcastDatasourceLoadingSpec.NONE, controllerTask.getBroadcastDatasourceLoadingSpec()); + } + @Test public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index eebf55869522..1515f26ac5a6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -89,7 +89,7 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; -import org.apache.druid.server.coordination.BroadcastLoadingSpec; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; @@ -241,13 +241,12 @@ public CompactionTask( : compactionRunner; this.currentSubTaskHolder = this.compactionRunner.getCurrentSubTaskHolder(); - // Do not load any lookups and broadcast segments in sub-tasks launched by compaction task, unless transformSpec is present. + // Do not load any lookups in sub-tasks launched by compaction task, unless transformSpec is present. // If transformSpec is present, we will not modify the context so that the sub-tasks can make the - // decision based on context values, loading all lookups and broadcast segments by default. + // decision based on context values, loading all lookups by default. // This is done to ensure backward compatibility since transformSpec can reference lookups. if (transformSpec == null) { addToContextIfAbsent(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); - addToContextIfAbsent(BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.NONE.toString()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 7c50ea13b2cd..06082a988d98 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -47,7 +47,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.server.coordination.BroadcastLoadingSpec; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; @@ -414,9 +414,9 @@ public LookupLoadingSpec getLookupLoadingSpec() } @Override - public BroadcastLoadingSpec getBroadcastDatasourcesLoadingSpec() + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() { - return BroadcastLoadingSpec.NONE; + return BroadcastDatasourceLoadingSpec.NONE; } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 4f0e0eb6cc0b..487ddb607381 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -41,7 +41,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; -import org.apache.druid.server.coordination.BroadcastLoadingSpec; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; @@ -335,8 +335,13 @@ default LookupLoadingSpec getLookupLoadingSpec() return LookupLoadingSpec.createFromContext(getContext(), LookupLoadingSpec.ALL); } - default BroadcastLoadingSpec getBroadcastDatasourcesLoadingSpec() + /** + * Specifies the list of broadcast datasources to load for this task. Tasks load ALL broadcast datasources by default. + * This behavior can be overridden by passing parameters {@link BroadcastDatasourceLoadingSpec#CTX_BROADCAST_DATASOURCE_LOADING_MODE} + * and {@link BroadcastDatasourceLoadingSpec#CTX_BROADCAST_DATASOURCES_TO_LOAD} in the task context. + */ + default BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() { - return BroadcastLoadingSpec.createFromContext(getContext(), BroadcastLoadingSpec.ALL); + return BroadcastDatasourceLoadingSpec.createFromContext(getContext(), BroadcastDatasourceLoadingSpec.ALL); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 9d1cc341bb59..c1bf649980f6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -129,7 +129,6 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; -import org.apache.druid.server.coordination.BroadcastLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthorizerMapper; @@ -1607,34 +1606,6 @@ public void testGetDefaultLookupLoadingSpecWithTransformSpec() Assert.assertEquals(LookupLoadingSpec.ALL, task.getLookupLoadingSpec()); } - @Test - public void testGetDefaultBroadcastLoadingSpec() - { - final Builder builder = new Builder( - DATA_SOURCE, - segmentCacheManagerFactory - ); - final CompactionTask task = builder - .interval(Intervals.of("2000-01-01/2000-01-02")) - .build(); - Assert.assertEquals(BroadcastLoadingSpec.NONE, task.getBroadcastDatasourcesLoadingSpec()); - } - - @Test - public void testGetDefaultBroadcastLoadingSpecWithTransformSpec() - { - final Builder builder = new Builder( - DATA_SOURCE, - segmentCacheManagerFactory - ); - final CompactionTask task = builder - .interval(Intervals.of("2000-01-01/2000-01-02")) - .transformSpec(new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null))) - .build(); - Assert.assertEquals(BroadcastLoadingSpec.ALL, task.getBroadcastDatasourcesLoadingSpec()); - } - - private Granularity chooseFinestGranularityHelper(List granularities) { SettableSupplier queryGranularity = new SettableSupplier<>(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index b42565f72e68..7a4df9de36c3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -37,7 +37,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase; -import org.apache.druid.server.coordination.BroadcastLoadingSpec; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.timeline.DataSegment; import org.assertj.core.api.Assertions; @@ -609,7 +609,7 @@ public void testGetBroadcastDatasourcesToLoad() .dataSource(DATA_SOURCE) .interval(Intervals.of("2019-03-01/2019-04-01")) .build(); - Assert.assertEquals(BroadcastLoadingSpec.Mode.NONE, task.getBroadcastDatasourcesLoadingSpec().getMode()); + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.NONE, task.getBroadcastDatasourceLoadingSpec().getMode()); } @Test diff --git a/server/src/main/java/org/apache/druid/server/coordination/BroadcastLoadingSpec.java b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java similarity index 52% rename from server/src/main/java/org/apache/druid/server/coordination/BroadcastLoadingSpec.java rename to server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java index 7f4adbf4db7b..2950ced3f160 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/BroadcastLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java @@ -31,22 +31,22 @@ import java.util.Set; /** - * This class defines the spec for loading of broadcastDatasources for a given task. It contains 2 fields: + * This class defines the spec for loading of broadcast datasources for a given task. It contains 2 fields: *
    - *
  1. {@link BroadcastLoadingSpec#mode}: This mode defines whether broadcastDatasources need to be + *
  2. {@link BroadcastDatasourceLoadingSpec#mode}: This mode defines whether broadcastDatasources need to be * loaded for the given task, or not. It can take 3 values:
  3. *
      - *
    • ALL: Load all the broadcastDatasources.
    • - *
    • NONE: Load no broadcastDatasources.
    • - *
    • ONLY_REQUIRED: Load only the broadcastDatasources defined in broadcastDatasourcesToLoad
    • + *
    • ALL: Load all the broadcast datasources.
    • + *
    • NONE: Load no broadcast datasources.
    • + *
    • ONLY_REQUIRED: Load only the broadcast datasources defined in broadcastDatasourcesToLoad
    • *
    - *
  4. {@link BroadcastLoadingSpec#broadcastDatasourcesToLoad}: Defines the broadcastDatasources to load when the broadcastDatasourceLoadingMode is set to ONLY_REQUIRED.
  5. + *
  6. {@link BroadcastDatasourceLoadingSpec#broadcastDatasourcesToLoad}: Defines the broadcastDatasources to load when the broacastDatasourceLoadingMode is set to ONLY_REQUIRED.
  7. *
*/ -public class BroadcastLoadingSpec +public class BroadcastDatasourceLoadingSpec { - public static final String CTX_BROADCAST_DATASOURCES_LOADING_MODE = "broadcastDatasourceLoadingMode"; + public static final String CTX_BROADCAST_DATASOURCE_LOADING_MODE = "broacastDatasourceLoadingMode"; public static final String CTX_BROADCAST_DATASOURCES_TO_LOAD = "broadcastDatasourcesToLoad"; public enum Mode @@ -58,24 +58,24 @@ public enum Mode @Nullable private final ImmutableSet broadcastDatasourcesToLoad; - public static final BroadcastLoadingSpec ALL = new BroadcastLoadingSpec(Mode.ALL, null); - public static final BroadcastLoadingSpec NONE = new BroadcastLoadingSpec(Mode.NONE, null); + public static final BroadcastDatasourceLoadingSpec ALL = new BroadcastDatasourceLoadingSpec(Mode.ALL, null); + public static final BroadcastDatasourceLoadingSpec NONE = new BroadcastDatasourceLoadingSpec(Mode.NONE, null); - private BroadcastLoadingSpec(Mode mode, @Nullable Set broadcastDatasourcesToLoad) + private BroadcastDatasourceLoadingSpec(Mode mode, @Nullable Set broadcastDatasourcesToLoad) { this.mode = mode; this.broadcastDatasourcesToLoad = broadcastDatasourcesToLoad == null ? null : ImmutableSet.copyOf(broadcastDatasourcesToLoad); } /** - * Creates a broadcastDatasourceLoadingSpec which loads only the broadcastDatasources present in the given set. + * Creates a BroadcastSegmentLoadingSpec which loads only the broadcast datasources present in the given set. */ - public static BroadcastLoadingSpec loadOnly(Set broadcastDatasourcesToLoad) + public static BroadcastDatasourceLoadingSpec loadOnly(Set broadcastDatasourcesToLoad) { if (broadcastDatasourcesToLoad == null) { throw InvalidInput.exception("Expected non-null set of broadcastDatasources to load."); } - return new BroadcastLoadingSpec(Mode.ONLY_REQUIRED, broadcastDatasourcesToLoad); + return new BroadcastDatasourceLoadingSpec(Mode.ONLY_REQUIRED, broadcastDatasourcesToLoad); } public Mode getMode() @@ -84,53 +84,57 @@ public Mode getMode() } /** - * @return A non-null immutable set of broadcastDatasource names when {@link BroadcastLoadingSpec#mode} is ONLY_REQUIRED, null otherwise. + * @return A non-null immutable set of broadcast datasource names when {@link BroadcastDatasourceLoadingSpec#mode} is ONLY_REQUIRED, null otherwise. */ - public ImmutableSet getbroadcastDatasourcesToLoad() + public ImmutableSet getBroadcastDatasourcesToLoad() { return broadcastDatasourcesToLoad; } - public static BroadcastLoadingSpec createFromContext(Map context, BroadcastLoadingSpec defaultSpec) + public static BroadcastDatasourceLoadingSpec createFromContext(Map context, BroadcastDatasourceLoadingSpec defaultSpec) { if (context == null) { return defaultSpec; } - final Object broadcastDatasourceModeValue = context.get(CTX_BROADCAST_DATASOURCES_LOADING_MODE); + final Object broadcastDatasourceModeValue = context.get(CTX_BROADCAST_DATASOURCE_LOADING_MODE); if (broadcastDatasourceModeValue == null) { return defaultSpec; } - final BroadcastLoadingSpec.Mode broadcastDatasourceLoadingMode; + final BroadcastDatasourceLoadingSpec.Mode broacastDatasourceLoadingMode; try { - broadcastDatasourceLoadingMode = BroadcastLoadingSpec.Mode.valueOf(broadcastDatasourceModeValue.toString()); + broacastDatasourceLoadingMode = BroadcastDatasourceLoadingSpec.Mode.valueOf(broadcastDatasourceModeValue.toString()); } catch (IllegalArgumentException e) { - throw InvalidInput.exception("Invalid value of %s[%s]. Allowed values are %s", - CTX_BROADCAST_DATASOURCES_LOADING_MODE, broadcastDatasourceModeValue.toString(), Arrays.asList( - BroadcastLoadingSpec.Mode.values())); + throw InvalidInput.exception( + "Invalid value of %s[%s]. Allowed values are %s", + CTX_BROADCAST_DATASOURCE_LOADING_MODE, broadcastDatasourceModeValue.toString(), + Arrays.asList(BroadcastDatasourceLoadingSpec.Mode.values()) + ); } - if (broadcastDatasourceLoadingMode == Mode.NONE) { + if (broacastDatasourceLoadingMode == Mode.NONE) { return NONE; - } else if (broadcastDatasourceLoadingMode == Mode.ALL) { + } else if (broacastDatasourceLoadingMode == Mode.ALL) { return ALL; - } else if (broadcastDatasourceLoadingMode == Mode.ONLY_REQUIRED) { - Collection broadcastDatasourcesToLoad; + } else if (broacastDatasourceLoadingMode == Mode.ONLY_REQUIRED) { + final Collection broadcastDatasourcesToLoad; try { broadcastDatasourcesToLoad = (Collection) context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD); } catch (ClassCastException e) { - throw InvalidInput.exception("Invalid value of %s[%s]. Please provide a comma-separated list of " - + "broadcastDatasource names. For example: [\"broadcastDatasourceName1\", \"broadcastDatasourceName2\"]", - CTX_BROADCAST_DATASOURCES_TO_LOAD, context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD)); + throw InvalidInput.exception( + "Invalid value of %s[%s]. Please provide a comma-separated list of broadcast datasource names." + + " For example: [\"datasourceName1\", \"datasourceName2\"]", + CTX_BROADCAST_DATASOURCES_TO_LOAD, context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD) + ); } if (broadcastDatasourcesToLoad == null || broadcastDatasourcesToLoad.isEmpty()) { throw InvalidInput.exception("Set of broadcastDatasources to load cannot be %s for mode[ONLY_REQUIRED].", broadcastDatasourcesToLoad); } - return BroadcastLoadingSpec.loadOnly(new HashSet<>(broadcastDatasourcesToLoad)); + return BroadcastDatasourceLoadingSpec.loadOnly(new HashSet<>(broadcastDatasourcesToLoad)); } else { return defaultSpec; } @@ -154,7 +158,7 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - BroadcastLoadingSpec that = (BroadcastLoadingSpec) o; + BroadcastDatasourceLoadingSpec that = (BroadcastDatasourceLoadingSpec) o; return mode == that.mode && Objects.equals(broadcastDatasourcesToLoad, that.broadcastDatasourcesToLoad); } diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java index ffeef623a082..582c9dc116f9 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java @@ -21,13 +21,11 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import org.apache.druid.client.BootstrapSegmentsResponse; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.error.DruidException; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.java.util.common.ISE; @@ -59,7 +57,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; /** * Responsible for bootstrapping segments already cached on disk and bootstrap segments fetched from the coordinator. @@ -109,7 +106,6 @@ public SegmentBootstrapper( this.coordinatorClient = coordinatorClient; this.emitter = emitter; this.datasourceHolder = datasourceHolder; - log.info("Datsource holder broadcastLoadingSpec:[%s] and lookupLoadingSpec:[%s]", datasourceHolder.getBroadcastLoadingSpec(), datasourceHolder.getLookupLoadingSpec()); } @LifecycleStart @@ -270,14 +266,15 @@ private void loadSegmentsOnStartup() throws IOException } /** - * @return a list of bootstrap segments based on {@link #datasourceHolder#getBroadcastLoadingSpec()}. When bootstrap segments cannot be found, an empty list is returned. + * @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned. + * The bootstrap segments returned are filtered by the broadcast datasources indicated by {@link DataSourceTaskIdHolder#getBroadcastDatasourceLoadingSpec()} + * if applicable. */ private List getBootstrapSegments() { - final BroadcastLoadingSpec.Mode mode = datasourceHolder.getBroadcastLoadingSpec().getMode(); - if (mode == BroadcastLoadingSpec.Mode.NONE) { - log.info("NONE"); - return ImmutableList.of(); // null? + final BroadcastDatasourceLoadingSpec.Mode mode = datasourceHolder.getBroadcastDatasourceLoadingSpec().getMode(); + if (mode == BroadcastDatasourceLoadingSpec.Mode.NONE) { + return ImmutableList.of(); } log.info("Fetching bootstrap segments from the coordinator."); @@ -288,8 +285,8 @@ private List getBootstrapSegments() try { final BootstrapSegmentsResponse response = FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true); - if (mode == BroadcastLoadingSpec.Mode.ONLY_REQUIRED) { - final Set broadcastDatasourcesToLoad = datasourceHolder.getBroadcastLoadingSpec().getbroadcastDatasourcesToLoad(); + if (mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) { + final Set broadcastDatasourcesToLoad = datasourceHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad(); final List filteredBroadcast = new ArrayList<>(); response.getIterator().forEachRemaining(segment -> { if (broadcastDatasourcesToLoad.contains(segment.getDataSource())) { @@ -297,7 +294,6 @@ private List getBootstrapSegments() } }); bootstrapSegments = filteredBroadcast; - log.info("GRRRR shrunk size[%d]", bootstrapSegments.size()); } else { bootstrapSegments = ImmutableList.copyOf(response.getIterator()); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java index bb698ce46f0b..87002a5157f8 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java +++ b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java @@ -21,7 +21,7 @@ import com.google.inject.Inject; import com.google.inject.name.Named; -import org.apache.druid.server.coordination.BroadcastLoadingSpec; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; public class DataSourceTaskIdHolder @@ -44,7 +44,7 @@ public class DataSourceTaskIdHolder @Named(BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) @Inject(optional = true) - BroadcastLoadingSpec broadcastLoadingSpec = BroadcastLoadingSpec.ALL; + BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = BroadcastDatasourceLoadingSpec.ALL; public String getDataSource() { @@ -61,8 +61,8 @@ public LookupLoadingSpec getLookupLoadingSpec() return lookupLoadingSpec; } - public BroadcastLoadingSpec getBroadcastLoadingSpec() + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() { - return broadcastLoadingSpec; + return broadcastDatasourceLoadingSpec; } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/BroadcastLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java similarity index 50% rename from server/src/test/java/org/apache/druid/server/coordination/BroadcastLoadingSpecTest.java rename to server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java index b52e33432805..9bb19ea585c8 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/BroadcastLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java @@ -33,37 +33,37 @@ import java.util.Set; @RunWith(JUnitParamsRunner.class) -public class BroadcastLoadingSpecTest +public class BroadcastDatasourceLoadingSpecTest { @Test public void testLoadingAllBroadcastDatasources() { - final BroadcastLoadingSpec spec = BroadcastLoadingSpec.ALL; - Assert.assertEquals(BroadcastLoadingSpec.Mode.ALL, spec.getMode()); - Assert.assertNull(spec.getbroadcastDatasourcesToLoad()); + final BroadcastDatasourceLoadingSpec spec = BroadcastDatasourceLoadingSpec.ALL; + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL, spec.getMode()); + Assert.assertNull(spec.getBroadcastDatasourcesToLoad()); } @Test public void testLoadingNoLookups() { - final BroadcastLoadingSpec spec = BroadcastLoadingSpec.NONE; - Assert.assertEquals(BroadcastLoadingSpec.Mode.NONE, spec.getMode()); - Assert.assertNull(spec.getbroadcastDatasourcesToLoad()); + final BroadcastDatasourceLoadingSpec spec = BroadcastDatasourceLoadingSpec.NONE; + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.NONE, spec.getMode()); + Assert.assertNull(spec.getBroadcastDatasourcesToLoad()); } @Test public void testLoadingOnlyRequiredLookups() { final Set broadcastDatasourcesToLoad = ImmutableSet.of("ds1", "ds2"); - final BroadcastLoadingSpec spec = BroadcastLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")); - Assert.assertEquals(BroadcastLoadingSpec.Mode.ONLY_REQUIRED, spec.getMode()); - Assert.assertEquals(broadcastDatasourcesToLoad, spec.getbroadcastDatasourcesToLoad()); + final BroadcastDatasourceLoadingSpec spec = BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")); + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED, spec.getMode()); + Assert.assertEquals(broadcastDatasourcesToLoad, spec.getBroadcastDatasourcesToLoad()); } @Test public void testLoadingOnlyRequiredLookupsWithNullList() { - DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastLoadingSpec.loadOnly(null)); + DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastDatasourceLoadingSpec.loadOnly(null)); Assert.assertEquals("Expected non-null set of broadcastDatasources to load.", exception.getMessage()); } @@ -72,18 +72,18 @@ public void testCreateBroadcastLoadingSpecFromNullContext() { // Default spec is returned in the case of context=null. Assert.assertEquals( - BroadcastLoadingSpec.NONE, - BroadcastLoadingSpec.createFromContext( + BroadcastDatasourceLoadingSpec.NONE, + BroadcastDatasourceLoadingSpec.createFromContext( null, - BroadcastLoadingSpec.NONE + BroadcastDatasourceLoadingSpec.NONE ) ); Assert.assertEquals( - BroadcastLoadingSpec.ALL, - BroadcastLoadingSpec.createFromContext( + BroadcastDatasourceLoadingSpec.ALL, + BroadcastDatasourceLoadingSpec.createFromContext( null, - BroadcastLoadingSpec.ALL + BroadcastDatasourceLoadingSpec.ALL ) ); } @@ -93,32 +93,32 @@ public void testCreateBroadcastLoadingSpecFromContext() { // Only required lookups are returned in the case of context having the lookup keys. Assert.assertEquals( - BroadcastLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")), - BroadcastLoadingSpec.createFromContext( + BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")), + BroadcastDatasourceLoadingSpec.createFromContext( ImmutableMap.of( - BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, Arrays.asList("ds1", "ds2"), - BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.ONLY_REQUIRED + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, Arrays.asList("ds1", "ds2"), + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED ), - BroadcastLoadingSpec.ALL + BroadcastDatasourceLoadingSpec.ALL ) ); // No lookups are returned in the case of context having mode=NONE, irrespective of the default spec. Assert.assertEquals( - BroadcastLoadingSpec.NONE, - BroadcastLoadingSpec.createFromContext( + BroadcastDatasourceLoadingSpec.NONE, + BroadcastDatasourceLoadingSpec.createFromContext( ImmutableMap.of( - BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.NONE), - BroadcastLoadingSpec.ALL + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.NONE), + BroadcastDatasourceLoadingSpec.ALL ) ); // All lookups are returned in the case of context having mode=ALL, irrespective of the default spec. Assert.assertEquals( - BroadcastLoadingSpec.ALL, - BroadcastLoadingSpec.createFromContext( - ImmutableMap.of(BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.ALL), - BroadcastLoadingSpec.NONE + BroadcastDatasourceLoadingSpec.ALL, + BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of(BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.ALL), + BroadcastDatasourceLoadingSpec.NONE ) ); } @@ -134,12 +134,12 @@ public void testCreateBroadcastLoadingSpecFromContext() "none" } ) - public void testCreateLookupLoadingSpecFromInvalidModeInContext(final String mode) + public void testSpecFromInvalidModeInContext(final String mode) { - final DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastLoadingSpec.createFromContext( - ImmutableMap.of(BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, mode), BroadcastLoadingSpec.ALL)); + final DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of(BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, mode), BroadcastDatasourceLoadingSpec.ALL)); Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", - BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, mode), exception.getMessage()); + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, mode), exception.getMessage()); } @@ -150,17 +150,17 @@ public void testCreateLookupLoadingSpecFromInvalidModeInContext(final String mod "foo]" } ) - public void testCreateLookupLoadingSpecFromInvalidLookupsInContext(Object lookupsToLoad) + public void testSpecFromInvalidBroadcastDatasourcesInContext(final Object lookupsToLoad) { final DruidException exception = Assert.assertThrows(DruidException.class, () -> - BroadcastLoadingSpec.createFromContext( + BroadcastDatasourceLoadingSpec.createFromContext( ImmutableMap.of( - BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad, - BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_LOADING_MODE, BroadcastLoadingSpec.Mode.ONLY_REQUIRED), - BroadcastLoadingSpec.ALL) + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad, + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED), + BroadcastDatasourceLoadingSpec.ALL) ); Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please provide a comma-separated list of " - + "broadcastDatasource names. For example: [\"lookupName1\", \"lookupName2\"]", - BroadcastLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad), exception.getMessage()); + + "broadcast datasource names. For example: [\"datasourceName1\", \"datasourceName2\"]", + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad), exception.getMessage()); } -} \ No newline at end of file +} diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java index 6df9d47400ae..fe1424e27005 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java @@ -25,23 +25,18 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Scopes; -import com.google.inject.TypeLiteral; import com.google.inject.name.Names; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ServerTypeConfig; -import org.apache.druid.initialization.ServerInjectorBuilder; import org.apache.druid.jackson.JacksonModule; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; -import org.apache.druid.server.metrics.MetricsModule; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -49,15 +44,11 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.validation.Validation; -import javax.validation.Validator; import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Properties; import java.util.Set; import static org.apache.druid.server.TestSegmentUtils.makeSegment; @@ -305,8 +296,8 @@ public void testLoadNoBootstrapSegments() throws Exception new LifecycleModule(), binder -> { binder.bindScope(LazySingleton.class, Scopes.SINGLETON); - final BroadcastLoadingSpec broadcastMode = BroadcastLoadingSpec.NONE; - binder.bind(Key.get(BroadcastLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + final BroadcastDatasourceLoadingSpec broadcastMode = BroadcastDatasourceLoadingSpec.NONE; + binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) .toInstance(broadcastMode); } ); @@ -344,8 +335,6 @@ public void testLoadNoBootstrapSegments() throws Exception Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); -// serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); -// serviceEmitter.verifyEmitted("segment/bootstrap/time", 0); bootstrapper.stop(); } @@ -368,8 +357,8 @@ public void testLoadOnlyRequiredBootstrapSegments() throws Exception new LifecycleModule(), binder -> { binder.bindScope(LazySingleton.class, Scopes.SINGLETON); - final BroadcastLoadingSpec broadcastMode = BroadcastLoadingSpec.loadOnly(ImmutableSet.of("test1")); - binder.bind(Key.get(BroadcastLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + final BroadcastDatasourceLoadingSpec broadcastMode = BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("test1")); + binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) .toInstance(broadcastMode); } ); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 212e4a8010da..e29c777e74af 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -123,7 +123,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.ResponseContextConfig; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.coordination.BroadcastLoadingSpec; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.coordination.SegmentBootstrapper; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ZkCoordinator; @@ -189,7 +189,7 @@ public class CliPeon extends GuiceRunnable * queryable tasks, such as streaming ingestion tasks. */ @Option(name = "--loadBroadcastDatasourcesMode", title = "loadBroadcastDatasourcesMode", description = "Enable loading of broadcast datasources") // maybe there is a way to directly provide enum? - public String loadBroadcastDatasourcesMode = BroadcastLoadingSpec.Mode.ALL.toString(); + public String loadBroadcastDatasourcesMode = BroadcastDatasourceLoadingSpec.Mode.ALL.toString(); @Option(name = "--taskId", title = "taskId", description = "TaskId for fetching task.json remotely") public String taskId = ""; @@ -283,10 +283,10 @@ public void configure(Binder binder) binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType))); LifecycleModule.register(binder, Server.class); - BroadcastLoadingSpec.Mode mode = BroadcastLoadingSpec.Mode.valueOf(loadBroadcastDatasourcesMode); + BroadcastDatasourceLoadingSpec.Mode mode = BroadcastDatasourceLoadingSpec.Mode.valueOf(loadBroadcastDatasourcesMode); if ("true".equals(loadBroadcastSegments) - || mode == BroadcastLoadingSpec.Mode.ALL - || mode == BroadcastLoadingSpec.Mode.ONLY_REQUIRED) { + || mode == BroadcastDatasourceLoadingSpec.Mode.ALL + || mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) { binder.install(new BroadcastSegmentLoadingModule()); } } @@ -356,9 +356,9 @@ public LookupLoadingSpec getLookupsToLoad(final Task task) @Provides @LazySingleton @Named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) - public BroadcastLoadingSpec getBroadcastDatasourcesToLoad(final Task task) + public BroadcastDatasourceLoadingSpec getBroadcastDatasourcesToLoad(final Task task) { - return task.getBroadcastDatasourcesLoadingSpec(); + return task.getBroadcastDatasourceLoadingSpec(); } }, new QueryablePeonModule(), From 38e13ecfad8eebc0fd6e4aff7f1ace1ac086ec4a Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 10 Sep 2024 08:58:32 -0400 Subject: [PATCH 04/10] Some commentary around deprecation, etc. --- .../k8s/overlord/taskadapter/K8sTaskAdapter.java | 9 +++++---- .../apache/druid/indexing/common/task/Task.java | 2 ++ .../druid/indexing/overlord/ForkingTaskRunner.java | 6 +++++- .../BroadcastDatasourceLoadingSpec.java | 14 +++++++------- .../main/java/org/apache/druid/cli/CliPeon.java | 13 +++++++++---- 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index 34913a7675fb..cc689f925f4f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -443,15 +443,16 @@ private List generateCommand(Task task) command.add(nodeType); } -// // If the task type is queryable, we need to load broadcast segments on the peon, used for -// // join queries + // If the task type is queryable, we need to load broadcast segments on the peon, used for + // join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here + // for backwards compatibility and can be removed in a future release. if (task.supportsQueries()) { command.add("--loadBroadcastSegments"); command.add("true"); } - command.add("--loadBroadcastDatasourcesMode"); - command.add(task.getBroadcastDatasourceLoadingSpec().toString()); + command.add("--loadBroadcastDatasourceMode"); + command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString()); command.add("--taskId"); command.add(task.getId()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 487ddb607381..5b21963300bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -176,6 +176,8 @@ default Set getInputSourceResources() throws UOE QueryRunner getQueryRunner(Query query); /** + *

Deprecated, see {@link #getBroadcastDatasourceLoadingSpec} instead.

+ * * True if this task type embeds a query stack, and therefore should preload resources (like broadcast tables) * that may be needed by queries. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index c676877c1107..7e4dd6c39c22 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -376,12 +376,16 @@ public TaskStatus call() } // If the task type is queryable, we need to load broadcast segments on the peon, used for - // join queries + // join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here + // for backwards compatibility and can be removed in a future release. if (task.supportsQueries()) { command.add("--loadBroadcastSegments"); command.add("true"); } + command.add("--loadBroadcastDatasourceMode"); + command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString()); + if (!taskFile.exists()) { jsonMapper.writeValue(taskFile, task); } diff --git a/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java index 2950ced3f160..3629844ce942 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java @@ -40,13 +40,13 @@ *
  • NONE: Load no broadcast datasources.
  • *
  • ONLY_REQUIRED: Load only the broadcast datasources defined in broadcastDatasourcesToLoad
  • * - *
  • {@link BroadcastDatasourceLoadingSpec#broadcastDatasourcesToLoad}: Defines the broadcastDatasources to load when the broacastDatasourceLoadingMode is set to ONLY_REQUIRED.
  • + *
  • {@link BroadcastDatasourceLoadingSpec#broadcastDatasourcesToLoad}: Defines the broadcastDatasources to load when the broadcastDatasourceLoadingMode is set to ONLY_REQUIRED.
  • * */ public class BroadcastDatasourceLoadingSpec { - public static final String CTX_BROADCAST_DATASOURCE_LOADING_MODE = "broacastDatasourceLoadingMode"; + public static final String CTX_BROADCAST_DATASOURCE_LOADING_MODE = "broadcastDatasourceLoadingMode"; public static final String CTX_BROADCAST_DATASOURCES_TO_LOAD = "broadcastDatasourcesToLoad"; public enum Mode @@ -102,9 +102,9 @@ public static BroadcastDatasourceLoadingSpec createFromContext(Map broadcastDatasourcesToLoad; try { broadcastDatasourcesToLoad = (Collection) context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index e29c777e74af..66b62ca309e4 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -177,18 +177,22 @@ public class CliPeon extends GuiceRunnable private boolean isZkEnabled = true; /** + *

    This option is deprecated, see {@link #loadBroadcastDatasourcesMode} option.

    + * * If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for * queryable tasks, such as streaming ingestion tasks. + * */ @Deprecated @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments") public String loadBroadcastSegments = "false"; /** - * Broadcast datasources loading mode. the peon will bind classes necessary for loading broadcast segments. This is used for - * queryable tasks, such as streaming ingestion tasks. + * Broadcast datasource loading mode. The peon will bind classes necessary required for loading broadcast segments if + * the mode is {@link BroadcastDatasourceLoadingSpec.Mode#ALL} or {@link BroadcastDatasourceLoadingSpec.Mode#ONLY_REQUIRED}. */ - @Option(name = "--loadBroadcastDatasourcesMode", title = "loadBroadcastDatasourcesMode", description = "Enable loading of broadcast datasources") // maybe there is a way to directly provide enum? + @Option(name = "--loadBroadcastDatasourceMode", title = "loadBroadcastDatasourceMode", + description = "Specify the broadcast datasource loading mode for the peon. Supported values are ALL, NONE, ONLY_REQUIRED.") public String loadBroadcastDatasourcesMode = BroadcastDatasourceLoadingSpec.Mode.ALL.toString(); @Option(name = "--taskId", title = "taskId", description = "TaskId for fetching task.json remotely") @@ -283,7 +287,8 @@ public void configure(Binder binder) binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType))); LifecycleModule.register(binder, Server.class); - BroadcastDatasourceLoadingSpec.Mode mode = BroadcastDatasourceLoadingSpec.Mode.valueOf(loadBroadcastDatasourcesMode); + final BroadcastDatasourceLoadingSpec.Mode mode = + BroadcastDatasourceLoadingSpec.Mode.valueOf(loadBroadcastDatasourcesMode); if ("true".equals(loadBroadcastSegments) || mode == BroadcastDatasourceLoadingSpec.Mode.ALL || mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) { From 7bafa270678145dbd08036ce73fbcda4e8c42e8c Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 10 Sep 2024 09:31:42 -0400 Subject: [PATCH 05/10] Fixup and add K8 pod template test. --- .../overlord/common/DruidK8sConstants.java | 1 + .../taskadapter/PodTemplateTaskAdapter.java | 6 +-- .../PodTemplateTaskAdapterTest.java | 43 ++++++++++++++++++- .../src/test/resources/expectedNoopJob.yaml | 2 + .../test/resources/expectedNoopJobBase.yaml | 2 + .../resources/expectedNoopJobLongIds.yaml | 2 + .../resources/expectedNoopJobNoTaskJson.yaml | 2 + .../expectedNoopJobTlsEnabledBase.yaml | 2 + 8 files changed, 56 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java index 568f8ed5a117..644a7f109b25 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java @@ -37,6 +37,7 @@ public class DruidK8sConstants public static final String TASK_JSON_ENV = "TASK_JSON"; public static final String TASK_DIR_ENV = "TASK_DIR"; public static final String TASK_ID_ENV = "TASK_ID"; + public static final String LOAD_BROADCAST_DATASOURCE_MODE_ENV = "LOAD_BROADCAST_DATASOURCE_MODE"; public static final String LOAD_BROADCAST_SEGMENTS_ENV = "LOAD_BROADCAST_SEGMENTS"; public static final String JAVA_OPTS = "JAVA_OPTS"; public static final String DRUID_HOST_ENV = "druid_host"; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index 38f2ab781784..321fe3fcb3e8 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -281,9 +281,9 @@ private Collection getEnv(Task task) throws IOException .withValue(task.getId()) .build(), new EnvVarBuilder() - .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV) - .withValue(task.getBroadcastDatasourceLoadingSpec().toString()) - .build(), + .withName(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV) + .withValue(task.getBroadcastDatasourceLoadingSpec().getMode().toString()) + .build(), new EnvVarBuilder() .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV) .withValue(Boolean.toString(task.supportsQueries())) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index ac2aaa705581..b25f23a25ddc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -46,6 +46,7 @@ import org.apache.druid.k8s.overlord.execution.Selector; import org.apache.druid.k8s.overlord.execution.SelectorBasedPodTemplateSelectStrategy; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.tasklogs.TaskLogs; import org.easymock.EasyMock; import org.junit.Assert; @@ -537,6 +538,7 @@ public void test_fromTask_taskSupportsQueries() throws IOException EasyMock.expect(task.getId()).andReturn("id").anyTimes(); EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes(); EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes(); + EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes(); EasyMock.replay(task); Job actual = adapter.fromTask(task); @@ -550,7 +552,46 @@ public void test_fromTask_taskSupportsQueries() throws IOException } @Test - public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperites() throws IOException + public void test_fromTask_withBroadcastDatasourceLoadingModeAll() throws IOException + { + Path templatePath = Files.createFile(tempDir.resolve("noop.yaml")); + mapper.writeValue(templatePath.toFile(), podTemplateSpec); + + Properties props = new Properties(); + props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); + props.setProperty("druid.indexer.runner.k8s.podTemplate.queryable", templatePath.toString()); + + PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( + taskRunnerConfig, + taskConfig, + node, + mapper, + props, + taskLogs, + dynamicConfigRef + ); + + Task task = EasyMock.mock(Task.class); + EasyMock.expect(task.supportsQueries()).andReturn(true); + EasyMock.expect(task.getType()).andReturn("queryable").anyTimes(); + EasyMock.expect(task.getId()).andReturn("id").anyTimes(); + EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes(); + EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes(); + EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes(); + + EasyMock.replay(task); + Job actual = adapter.fromTask(task); + EasyMock.verify(task); + + Assertions.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL.toString(), actual.getSpec().getTemplate() + .getSpec().getContainers() + .get(0).getEnv().stream() + .filter(env -> env.getName().equals(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV)) + .collect(Collectors.toList()).get(0).getValue()); + } + + @Test + public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperties() throws IOException { Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml index ddae7c0567f2..ac539c5da154 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml @@ -45,6 +45,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml index 532c3dd53e82..f7c2ff958bbc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml @@ -45,6 +45,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml index d6c316dcdde8..3a3af1528b56 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml @@ -44,6 +44,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml index 90ae99709598..ec7f9a062481 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml @@ -43,6 +43,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" image: one diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml index 0e52beac9e32..84457fb3175c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml @@ -44,6 +44,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" From dbaea794efba268098379d86360c9cc3619f33b8 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 10 Sep 2024 09:45:16 -0400 Subject: [PATCH 06/10] Remove unused import. --- .../org/apache/druid/indexing/common/task/CompactionTask.java | 1 - services/src/main/java/org/apache/druid/cli/CliPeon.java | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 1515f26ac5a6..73c8a35405c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -89,7 +89,6 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; -import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 66b62ca309e4..ca8c4181cf96 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -184,7 +184,9 @@ public class CliPeon extends GuiceRunnable * */ @Deprecated - @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments") + @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", + description = "Enable loading of broadcast segments. This option is deprecated and will be removed in a" + + " future release. Use --loadBroadcastDatasourceMode instead.") public String loadBroadcastSegments = "false"; /** From 1f8908fbad05225057ea464a9a74f2aef4151378 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 10 Sep 2024 11:53:03 -0400 Subject: [PATCH 07/10] Fix up test and lookup nullable annotation. --- .../java/org/apache/druid/indexing/common/task/Task.java | 1 - .../server/coordination/SegmentBootstrapperCacheTest.java | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 5b21963300bb..c8a0712da60d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -331,7 +331,6 @@ static TaskInfo toTaskIdentifierInfo(TaskInfo Date: Tue, 10 Sep 2024 12:07:04 -0400 Subject: [PATCH 08/10] Remove unused import --- .../main/java/org/apache/druid/indexing/common/task/Task.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index d8b4ee9aa893..5f562a0272a3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -48,7 +48,6 @@ import org.apache.druid.server.security.ResourceType; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.util.Map; import java.util.Optional; import java.util.Set; From 8263305a97cdb31f267af0e7b4f4ed98df2c0358 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Wed, 11 Sep 2024 08:01:10 -0400 Subject: [PATCH 09/10] Remove deprecated annotation. --- .../main/java/org/apache/druid/indexing/common/task/Task.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 5f562a0272a3..cacdc47f520a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -175,8 +175,6 @@ default Set getInputSourceResources() throws UOE QueryRunner getQueryRunner(Query query); /** - *

    Deprecated, see {@link #getBroadcastDatasourceLoadingSpec} instead.

    - * * True if this task type embeds a query stack, and therefore should preload resources (like broadcast tables) * that may be needed by queries. Tasks supporting queries are also allocated processing buffers, processing threads * and merge buffers. Those which do not should not assume that these resources are present and must explicitly allocate @@ -186,7 +184,6 @@ default Set getInputSourceResources() throws UOE * MSQWorkerTask returns true from this method (because it embeds a query stack for running multi-stage queries) * even though it is not directly queryable via HTTP. */ - @Deprecated boolean supportsQueries(); /** From 52fe302f18a30329b1059f9ab341dddc36c7dbe8 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Thu, 12 Sep 2024 09:28:42 -0400 Subject: [PATCH 10/10] Log and fix up case. --- .../server/coordination/BroadcastDatasourceLoadingSpec.java | 6 +++--- .../druid/server/coordination/SegmentBootstrapper.java | 3 ++- .../coordination/BroadcastDatasourceLoadingSpecTest.java | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java index 3629844ce942..3a11027311e6 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java @@ -73,7 +73,7 @@ private BroadcastDatasourceLoadingSpec(Mode mode, @Nullable Set broadcas public static BroadcastDatasourceLoadingSpec loadOnly(Set broadcastDatasourcesToLoad) { if (broadcastDatasourcesToLoad == null) { - throw InvalidInput.exception("Expected non-null set of broadcastDatasources to load."); + throw InvalidInput.exception("Expected non-null set of broadcast datasources to load."); } return new BroadcastDatasourceLoadingSpec(Mode.ONLY_REQUIRED, broadcastDatasourcesToLoad); } @@ -132,7 +132,7 @@ public static BroadcastDatasourceLoadingSpec createFromContext(Map(broadcastDatasourcesToLoad)); } else { @@ -143,7 +143,7 @@ public static BroadcastDatasourceLoadingSpec createFromContext(Map bootstrapSegments = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java index 9bb19ea585c8..ddec0901965d 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java @@ -64,7 +64,7 @@ public void testLoadingOnlyRequiredLookups() public void testLoadingOnlyRequiredLookupsWithNullList() { DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastDatasourceLoadingSpec.loadOnly(null)); - Assert.assertEquals("Expected non-null set of broadcastDatasources to load.", exception.getMessage()); + Assert.assertEquals("Expected non-null set of broadcast datasources to load.", exception.getMessage()); } @Test