diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java index 568f8ed5a117..644a7f109b25 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java @@ -37,6 +37,7 @@ public class DruidK8sConstants public static final String TASK_JSON_ENV = "TASK_JSON"; public static final String TASK_DIR_ENV = "TASK_DIR"; public static final String TASK_ID_ENV = "TASK_ID"; + public static final String LOAD_BROADCAST_DATASOURCE_MODE_ENV = "LOAD_BROADCAST_DATASOURCE_MODE"; public static final String LOAD_BROADCAST_SEGMENTS_ENV = "LOAD_BROADCAST_SEGMENTS"; public static final String JAVA_OPTS = "JAVA_OPTS"; public static final String DRUID_HOST_ENV = "druid_host"; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index c15698803d9f..cc689f925f4f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -444,12 +444,16 @@ private List generateCommand(Task task) } // If the task type is queryable, we need to load broadcast segments on the peon, used for - // join queries + // join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here + // for backwards compatibility and can be removed in a future release. if (task.supportsQueries()) { command.add("--loadBroadcastSegments"); command.add("true"); } + command.add("--loadBroadcastDatasourceMode"); + command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString()); + command.add("--taskId"); command.add(task.getId()); log.info( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index e8aaf1bbab35..321fe3fcb3e8 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -280,6 +280,10 @@ private Collection getEnv(Task task) throws IOException .withName(DruidK8sConstants.TASK_ID_ENV) .withValue(task.getId()) .build(), + new EnvVarBuilder() + .withName(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV) + .withValue(task.getBroadcastDatasourceLoadingSpec().getMode().toString()) + .build(), new EnvVarBuilder() .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV) .withValue(Boolean.toString(task.supportsQueries())) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index ac2aaa705581..b25f23a25ddc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -46,6 +46,7 @@ import org.apache.druid.k8s.overlord.execution.Selector; import org.apache.druid.k8s.overlord.execution.SelectorBasedPodTemplateSelectStrategy; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.tasklogs.TaskLogs; import org.easymock.EasyMock; import org.junit.Assert; @@ -537,6 +538,7 @@ public void test_fromTask_taskSupportsQueries() throws IOException EasyMock.expect(task.getId()).andReturn("id").anyTimes(); EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes(); EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes(); + EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes(); EasyMock.replay(task); Job actual = adapter.fromTask(task); @@ -550,7 +552,46 @@ public void test_fromTask_taskSupportsQueries() throws IOException } @Test - public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperites() throws IOException + public void test_fromTask_withBroadcastDatasourceLoadingModeAll() throws IOException + { + Path templatePath = Files.createFile(tempDir.resolve("noop.yaml")); + mapper.writeValue(templatePath.toFile(), podTemplateSpec); + + Properties props = new Properties(); + props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); + props.setProperty("druid.indexer.runner.k8s.podTemplate.queryable", templatePath.toString()); + + PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( + taskRunnerConfig, + taskConfig, + node, + mapper, + props, + taskLogs, + dynamicConfigRef + ); + + Task task = EasyMock.mock(Task.class); + EasyMock.expect(task.supportsQueries()).andReturn(true); + EasyMock.expect(task.getType()).andReturn("queryable").anyTimes(); + EasyMock.expect(task.getId()).andReturn("id").anyTimes(); + EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes(); + EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes(); + EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes(); + + EasyMock.replay(task); + Job actual = adapter.fromTask(task); + EasyMock.verify(task); + + Assertions.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL.toString(), actual.getSpec().getTemplate() + .getSpec().getContainers() + .get(0).getEnv().stream() + .filter(env -> env.getName().equals(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV)) + .collect(Collectors.toList()).get(0).getValue()); + } + + @Test + public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperties() throws IOException { Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml index ddae7c0567f2..ac539c5da154 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml @@ -45,6 +45,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml index 532c3dd53e82..f7c2ff958bbc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml @@ -45,6 +45,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml index d6c316dcdde8..3a3af1528b56 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml @@ -44,6 +44,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml index 90ae99709598..ec7f9a062481 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml @@ -43,6 +43,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" image: one diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml index 0e52beac9e32..84457fb3175c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml @@ -44,6 +44,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index c3f6feaab245..4ddc8274b9d0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -59,6 +59,7 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; @@ -374,4 +375,10 @@ public LookupLoadingSpec getLookupLoadingSpec() { return LookupLoadingSpec.NONE; } + + @Override + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() + { + return BroadcastDatasourceLoadingSpec.NONE; + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index 76586c1e1081..8d974285fb57 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -35,6 +35,7 @@ import org.apache.druid.query.Druids; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; @@ -104,6 +105,22 @@ public void testGetDefaultLookupLoadingSpec() Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); } + @Test + public void testGetDefaultBroadcastDatasourceLoadingSpec() + { + MSQControllerTask controllerTask = new MSQControllerTask( + null, + MSQ_SPEC, + null, + null, + null, + null, + null, + null + ); + Assert.assertEquals(BroadcastDatasourceLoadingSpec.NONE, controllerTask.getBroadcastDatasourceLoadingSpec()); + } + @Test public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index e1f6d2915eea..06082a988d98 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; @@ -412,6 +413,12 @@ public LookupLoadingSpec getLookupLoadingSpec() return LookupLoadingSpec.NONE; } + @Override + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() + { + return BroadcastDatasourceLoadingSpec.NONE; + } + @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 9b882e2e8d2b..c80c1fecbe5a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -41,13 +41,13 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -327,9 +327,18 @@ static TaskInfo toTaskIdentifierInfo(TaskInfo + *
  • {@link BroadcastDatasourceLoadingSpec#mode}: This mode defines whether broadcastDatasources need to be + * loaded for the given task, or not. It can take 3 values:
  • + *
      + *
    • ALL: Load all the broadcast datasources.
    • + *
    • NONE: Load no broadcast datasources.
    • + *
    • ONLY_REQUIRED: Load only the broadcast datasources defined in broadcastDatasourcesToLoad
    • + *
    + *
  • {@link BroadcastDatasourceLoadingSpec#broadcastDatasourcesToLoad}: Defines the broadcastDatasources to load when the broadcastDatasourceLoadingMode is set to ONLY_REQUIRED.
  • + * + */ +public class BroadcastDatasourceLoadingSpec +{ + + public static final String CTX_BROADCAST_DATASOURCE_LOADING_MODE = "broadcastDatasourceLoadingMode"; + public static final String CTX_BROADCAST_DATASOURCES_TO_LOAD = "broadcastDatasourcesToLoad"; + + public enum Mode + { + ALL, NONE, ONLY_REQUIRED + } + + private final Mode mode; + @Nullable + private final ImmutableSet broadcastDatasourcesToLoad; + + public static final BroadcastDatasourceLoadingSpec ALL = new BroadcastDatasourceLoadingSpec(Mode.ALL, null); + public static final BroadcastDatasourceLoadingSpec NONE = new BroadcastDatasourceLoadingSpec(Mode.NONE, null); + + private BroadcastDatasourceLoadingSpec(Mode mode, @Nullable Set broadcastDatasourcesToLoad) + { + this.mode = mode; + this.broadcastDatasourcesToLoad = broadcastDatasourcesToLoad == null ? null : ImmutableSet.copyOf(broadcastDatasourcesToLoad); + } + + /** + * Creates a BroadcastSegmentLoadingSpec which loads only the broadcast datasources present in the given set. + */ + public static BroadcastDatasourceLoadingSpec loadOnly(Set broadcastDatasourcesToLoad) + { + if (broadcastDatasourcesToLoad == null) { + throw InvalidInput.exception("Expected non-null set of broadcast datasources to load."); + } + return new BroadcastDatasourceLoadingSpec(Mode.ONLY_REQUIRED, broadcastDatasourcesToLoad); + } + + public Mode getMode() + { + return mode; + } + + /** + * @return A non-null immutable set of broadcast datasource names when {@link BroadcastDatasourceLoadingSpec#mode} is ONLY_REQUIRED, null otherwise. + */ + public ImmutableSet getBroadcastDatasourcesToLoad() + { + return broadcastDatasourcesToLoad; + } + + public static BroadcastDatasourceLoadingSpec createFromContext(Map context, BroadcastDatasourceLoadingSpec defaultSpec) + { + if (context == null) { + return defaultSpec; + } + + final Object broadcastDatasourceModeValue = context.get(CTX_BROADCAST_DATASOURCE_LOADING_MODE); + if (broadcastDatasourceModeValue == null) { + return defaultSpec; + } + + final BroadcastDatasourceLoadingSpec.Mode broadcastDatasourceLoadingMode; + try { + broadcastDatasourceLoadingMode = BroadcastDatasourceLoadingSpec.Mode.valueOf(broadcastDatasourceModeValue.toString()); + } + catch (IllegalArgumentException e) { + throw InvalidInput.exception( + "Invalid value of %s[%s]. Allowed values are %s", + CTX_BROADCAST_DATASOURCE_LOADING_MODE, broadcastDatasourceModeValue.toString(), + Arrays.asList(BroadcastDatasourceLoadingSpec.Mode.values()) + ); + } + + if (broadcastDatasourceLoadingMode == Mode.NONE) { + return NONE; + } else if (broadcastDatasourceLoadingMode == Mode.ALL) { + return ALL; + } else if (broadcastDatasourceLoadingMode == Mode.ONLY_REQUIRED) { + final Collection broadcastDatasourcesToLoad; + try { + broadcastDatasourcesToLoad = (Collection) context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD); + } + catch (ClassCastException e) { + throw InvalidInput.exception( + "Invalid value of %s[%s]. Please provide a comma-separated list of broadcast datasource names." + + " For example: [\"datasourceName1\", \"datasourceName2\"]", + CTX_BROADCAST_DATASOURCES_TO_LOAD, context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD) + ); + } + + if (broadcastDatasourcesToLoad == null || broadcastDatasourcesToLoad.isEmpty()) { + throw InvalidInput.exception("Set of broadcast datasources to load cannot be %s for mode[ONLY_REQUIRED].", broadcastDatasourcesToLoad); + } + return BroadcastDatasourceLoadingSpec.loadOnly(new HashSet<>(broadcastDatasourcesToLoad)); + } else { + return defaultSpec; + } + } + + @Override + public String toString() + { + return "BroadcastDatasourceLoadingSpec{" + + "mode=" + mode + + ", broadcastDatasourcesToLoad=" + broadcastDatasourcesToLoad + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BroadcastDatasourceLoadingSpec that = (BroadcastDatasourceLoadingSpec) o; + return mode == that.mode && Objects.equals(broadcastDatasourcesToLoad, that.broadcastDatasourcesToLoad); + } + + @Override + public int hashCode() + { + return Objects.hash(mode, broadcastDatasourcesToLoad); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java index c5b71fbcddcf..7eec82e80b1c 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java @@ -39,12 +39,14 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -80,6 +82,8 @@ public class SegmentBootstrapper private static final EmittingLogger log = new EmittingLogger(SegmentBootstrapper.class); + private final DataSourceTaskIdHolder datasourceHolder; + @Inject public SegmentBootstrapper( SegmentLoadDropHandler loadDropHandler, @@ -89,7 +93,8 @@ public SegmentBootstrapper( SegmentManager segmentManager, ServerTypeConfig serverTypeConfig, CoordinatorClient coordinatorClient, - ServiceEmitter emitter + ServiceEmitter emitter, + DataSourceTaskIdHolder datasourceHolder ) { this.loadDropHandler = loadDropHandler; @@ -100,6 +105,7 @@ public SegmentBootstrapper( this.serverTypeConfig = serverTypeConfig; this.coordinatorClient = coordinatorClient; this.emitter = emitter; + this.datasourceHolder = datasourceHolder; } @LifecycleStart @@ -261,10 +267,18 @@ private void loadSegmentsOnStartup() throws IOException /** * @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned. + * The bootstrap segments returned are filtered by the broadcast datasources indicated by {@link DataSourceTaskIdHolder#getBroadcastDatasourceLoadingSpec()} + * if applicable. */ private List getBootstrapSegments() { - log.info("Fetching bootstrap segments from the coordinator."); + final BroadcastDatasourceLoadingSpec.Mode mode = datasourceHolder.getBroadcastDatasourceLoadingSpec().getMode(); + if (mode == BroadcastDatasourceLoadingSpec.Mode.NONE) { + log.info("Skipping fetch of bootstrap segments."); + return ImmutableList.of(); + } + + log.info("Fetching bootstrap segments from the coordinator with BroadcastDatasourceLoadingSpec mode[%s].", mode); final Stopwatch stopwatch = Stopwatch.createStarted(); List bootstrapSegments = new ArrayList<>(); @@ -272,7 +286,18 @@ private List getBootstrapSegments() try { final BootstrapSegmentsResponse response = FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true); - bootstrapSegments = ImmutableList.copyOf(response.getIterator()); + if (mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) { + final Set broadcastDatasourcesToLoad = datasourceHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad(); + final List filteredBroadcast = new ArrayList<>(); + response.getIterator().forEachRemaining(segment -> { + if (broadcastDatasourcesToLoad.contains(segment.getDataSource())) { + filteredBroadcast.add(segment); + } + }); + bootstrapSegments = filteredBroadcast; + } else { + bootstrapSegments = ImmutableList.copyOf(response.getIterator()); + } } catch (Exception e) { log.warn("Error fetching bootstrap segments from the coordinator: [%s]. ", e.getMessage()); @@ -284,7 +309,6 @@ private List getBootstrapSegments() emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", bootstrapSegments.size())); log.info("Fetched [%d] bootstrap segments in [%d]ms.", bootstrapSegments.size(), fetchRunMillis); } - return bootstrapSegments; } diff --git a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java index 6d2dafd31a55..87002a5157f8 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java +++ b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java @@ -21,15 +21,16 @@ import com.google.inject.Inject; import com.google.inject.name.Named; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import javax.annotation.Nullable; - public class DataSourceTaskIdHolder { public static final String DATA_SOURCE_BINDING = "druidDataSource"; public static final String TASK_ID_BINDING = "druidTaskId"; public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask"; + public static final String BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK = "broadcastDatasourcesToLoadForTask"; + @Named(DATA_SOURCE_BINDING) @Inject(optional = true) String dataSource = null; @@ -37,11 +38,14 @@ public class DataSourceTaskIdHolder @Inject(optional = true) String taskId = null; - @Nullable @Named(LOOKUPS_TO_LOAD_FOR_TASK) @Inject(optional = true) LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL; + @Named(BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) + @Inject(optional = true) + BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = BroadcastDatasourceLoadingSpec.ALL; + public String getDataSource() { return dataSource; @@ -56,4 +60,9 @@ public LookupLoadingSpec getLookupLoadingSpec() { return lookupLoadingSpec; } + + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() + { + return broadcastDatasourceLoadingSpec; + } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java new file mode 100644 index 000000000000..ddec0901965d --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Set; + +@RunWith(JUnitParamsRunner.class) +public class BroadcastDatasourceLoadingSpecTest +{ + @Test + public void testLoadingAllBroadcastDatasources() + { + final BroadcastDatasourceLoadingSpec spec = BroadcastDatasourceLoadingSpec.ALL; + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL, spec.getMode()); + Assert.assertNull(spec.getBroadcastDatasourcesToLoad()); + } + + @Test + public void testLoadingNoLookups() + { + final BroadcastDatasourceLoadingSpec spec = BroadcastDatasourceLoadingSpec.NONE; + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.NONE, spec.getMode()); + Assert.assertNull(spec.getBroadcastDatasourcesToLoad()); + } + + @Test + public void testLoadingOnlyRequiredLookups() + { + final Set broadcastDatasourcesToLoad = ImmutableSet.of("ds1", "ds2"); + final BroadcastDatasourceLoadingSpec spec = BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")); + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED, spec.getMode()); + Assert.assertEquals(broadcastDatasourcesToLoad, spec.getBroadcastDatasourcesToLoad()); + } + + @Test + public void testLoadingOnlyRequiredLookupsWithNullList() + { + DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastDatasourceLoadingSpec.loadOnly(null)); + Assert.assertEquals("Expected non-null set of broadcast datasources to load.", exception.getMessage()); + } + + @Test + public void testCreateBroadcastLoadingSpecFromNullContext() + { + // Default spec is returned in the case of context=null. + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.NONE, + BroadcastDatasourceLoadingSpec.createFromContext( + null, + BroadcastDatasourceLoadingSpec.NONE + ) + ); + + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.ALL, + BroadcastDatasourceLoadingSpec.createFromContext( + null, + BroadcastDatasourceLoadingSpec.ALL + ) + ); + } + + @Test + public void testCreateBroadcastLoadingSpecFromContext() + { + // Only required lookups are returned in the case of context having the lookup keys. + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")), + BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of( + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, Arrays.asList("ds1", "ds2"), + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED + ), + BroadcastDatasourceLoadingSpec.ALL + ) + ); + + // No lookups are returned in the case of context having mode=NONE, irrespective of the default spec. + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.NONE, + BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of( + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.NONE), + BroadcastDatasourceLoadingSpec.ALL + ) + ); + + // All lookups are returned in the case of context having mode=ALL, irrespective of the default spec. + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.ALL, + BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of(BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.ALL), + BroadcastDatasourceLoadingSpec.NONE + ) + ); + } + + @Test + @Parameters( + { + "NONE1", + "A", + "Random mode", + "all", + "only required", + "none" + } + ) + public void testSpecFromInvalidModeInContext(final String mode) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of(BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, mode), BroadcastDatasourceLoadingSpec.ALL)); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, mode), exception.getMessage()); + } + + + @Test + @Parameters( + { + "foo bar", + "foo]" + } + ) + public void testSpecFromInvalidBroadcastDatasourcesInContext(final Object lookupsToLoad) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> + BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of( + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad, + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED), + BroadcastDatasourceLoadingSpec.ALL) + ); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "broadcast datasource names. For example: [\"datasourceName1\", \"datasourceName2\"]", + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad), exception.getMessage()); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java index 7629a6b875c8..187725317a21 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.TestSegmentUtils; +import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -137,7 +138,8 @@ public void testLoadStartStopWithEmptyLocations() throws IOException segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - emitter + emitter, + new DataSourceTaskIdHolder() ); bootstrapper.start(); @@ -164,7 +166,8 @@ public void testLoadStartStop() throws IOException segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - emitter + emitter, + new DataSourceTaskIdHolder() ); bootstrapper.start(); @@ -204,7 +207,8 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - emitter + emitter, + new DataSourceTaskIdHolder() ); bootstrapper.start(); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java index c41763f18245..fe1424e27005 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java @@ -20,13 +20,23 @@ package org.apache.druid.server.coordination; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Scopes; +import com.google.inject.name.Names; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.jackson.JacksonModule; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -125,7 +135,8 @@ public void testStartStop() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -184,7 +195,8 @@ public void testLoadCachedSegments() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -240,7 +252,8 @@ public void testLoadBootstrapSegments() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -267,6 +280,129 @@ public void testLoadBootstrapSegments() throws Exception bootstrapper.stop(); } + @Test + public void testLoadNoBootstrapSegments() throws Exception + { + final Set segments = new HashSet<>(); + for (int i = 0; i < COUNT; ++i) { + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02"))); + } + + Injector injector = Guice.createInjector( + new JacksonModule(), + new LifecycleModule(), + binder -> { + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + final BroadcastDatasourceLoadingSpec broadcastMode = BroadcastDatasourceLoadingSpec.NONE; + binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + .toInstance(broadcastMode); + } + ); + + final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments); + final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(); + final SegmentManager segmentManager = new SegmentManager(cacheManager); + final SegmentLoadDropHandler handler = new SegmentLoadDropHandler( + segmentLoaderConfig, + segmentAnnouncer, + segmentManager + ); + final SegmentBootstrapper bootstrapper = new SegmentBootstrapper( + handler, + segmentLoaderConfig, + segmentAnnouncer, + serverAnnouncer, + segmentManager, + new ServerTypeConfig(ServerType.HISTORICAL), + coordinatorClient, + serviceEmitter, + injector.getInstance(DataSourceTaskIdHolder.class) + ); + + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + bootstrapper.start(); + + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + final ImmutableList expectedBootstrapSegments = ImmutableList.of(); + + Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + + bootstrapper.stop(); + } + + @Test + public void testLoadOnlyRequiredBootstrapSegments() throws Exception + { + final Set segments = new HashSet<>(); + final DataSegment ds1Segment1 = makeSegment("test1", "1", Intervals.of("P1D/2011-04-01")); + final DataSegment ds1Segment2 = makeSegment("test1", "1", Intervals.of("P1D/2012-04-01")); + final DataSegment ds2Segment1 = makeSegment("test2", "1", Intervals.of("P1d/2011-04-01")); + final DataSegment ds2Segment2 = makeSegment("test2", "1", Intervals.of("P1d/2012-04-01")); + segments.add(ds1Segment1); + segments.add(ds1Segment2); + segments.add(ds2Segment1); + segments.add(ds2Segment2); + + Injector injector = Guice.createInjector( + new JacksonModule(), + new LifecycleModule(), + binder -> { + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + final BroadcastDatasourceLoadingSpec broadcastMode = BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("test1")); + binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + .toInstance(broadcastMode); + } + ); + + final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments); + final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(); + final SegmentManager segmentManager = new SegmentManager(cacheManager); + final SegmentLoadDropHandler handler = new SegmentLoadDropHandler( + segmentLoaderConfig, + segmentAnnouncer, + segmentManager + ); + final SegmentBootstrapper bootstrapper = new SegmentBootstrapper( + handler, + segmentLoaderConfig, + segmentAnnouncer, + serverAnnouncer, + segmentManager, + new ServerTypeConfig(ServerType.HISTORICAL), + coordinatorClient, + serviceEmitter, + injector.getInstance(DataSourceTaskIdHolder.class) + ); + + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + bootstrapper.start(); + + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); + Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty()); + Assert.assertEquals(ImmutableSet.of("test1"), segmentManager.getDataSourceNames()); + + final ImmutableList expectedBootstrapSegments = ImmutableList.of(ds1Segment2, ds1Segment1); + + Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); + serviceEmitter.verifyEmitted("segment/bootstrap/time", 1); + + bootstrapper.stop(); + } + @Test public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception { @@ -285,7 +421,8 @@ public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index eb572850cda2..ca8c4181cf96 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -123,6 +123,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.ResponseContextConfig; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.coordination.SegmentBootstrapper; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ZkCoordinator; @@ -176,12 +177,26 @@ public class CliPeon extends GuiceRunnable private boolean isZkEnabled = true; /** + *

    This option is deprecated, see {@link #loadBroadcastDatasourcesMode} option.

    + * * If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for * queryable tasks, such as streaming ingestion tasks. + * */ - @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments") + @Deprecated + @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", + description = "Enable loading of broadcast segments. This option is deprecated and will be removed in a" + + " future release. Use --loadBroadcastDatasourceMode instead.") public String loadBroadcastSegments = "false"; + /** + * Broadcast datasource loading mode. The peon will bind classes necessary required for loading broadcast segments if + * the mode is {@link BroadcastDatasourceLoadingSpec.Mode#ALL} or {@link BroadcastDatasourceLoadingSpec.Mode#ONLY_REQUIRED}. + */ + @Option(name = "--loadBroadcastDatasourceMode", title = "loadBroadcastDatasourceMode", + description = "Specify the broadcast datasource loading mode for the peon. Supported values are ALL, NONE, ONLY_REQUIRED.") + public String loadBroadcastDatasourcesMode = BroadcastDatasourceLoadingSpec.Mode.ALL.toString(); + @Option(name = "--taskId", title = "taskId", description = "TaskId for fetching task.json remotely") public String taskId = ""; @@ -274,7 +289,11 @@ public void configure(Binder binder) binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType))); LifecycleModule.register(binder, Server.class); - if ("true".equals(loadBroadcastSegments)) { + final BroadcastDatasourceLoadingSpec.Mode mode = + BroadcastDatasourceLoadingSpec.Mode.valueOf(loadBroadcastDatasourcesMode); + if ("true".equals(loadBroadcastSegments) + || mode == BroadcastDatasourceLoadingSpec.Mode.ALL + || mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) { binder.install(new BroadcastSegmentLoadingModule()); } } @@ -340,6 +359,14 @@ public LookupLoadingSpec getLookupsToLoad(final Task task) { return task.getLookupLoadingSpec(); } + + @Provides + @LazySingleton + @Named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) + public BroadcastDatasourceLoadingSpec getBroadcastDatasourcesToLoad(final Task task) + { + return task.getBroadcastDatasourceLoadingSpec(); + } }, new QueryablePeonModule(), new IndexingServiceInputSourceModule(),