Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DruidK8sConstants
public static final String TASK_JSON_ENV = "TASK_JSON";
public static final String TASK_DIR_ENV = "TASK_DIR";
public static final String TASK_ID_ENV = "TASK_ID";
public static final String LOAD_BROADCAST_DATASOURCE_MODE_ENV = "LOAD_BROADCAST_DATASOURCE_MODE";
public static final String LOAD_BROADCAST_SEGMENTS_ENV = "LOAD_BROADCAST_SEGMENTS";
public static final String JAVA_OPTS = "JAVA_OPTS";
public static final String DRUID_HOST_ENV = "druid_host";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,16 @@ private List<String> generateCommand(Task task)
}

// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries
// join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here
// for backwards compatibility and can be removed in a future release.
if (task.supportsQueries()) {
command.add("--loadBroadcastSegments");
command.add("true");
}

command.add("--loadBroadcastDatasourceMode");
command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString());

command.add("--taskId");
command.add(task.getId());
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ private Collection<EnvVar> getEnv(Task task) throws IOException
.withName(DruidK8sConstants.TASK_ID_ENV)
.withValue(task.getId())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV)
.withValue(task.getBroadcastDatasourceLoadingSpec().getMode().toString())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
.withValue(Boolean.toString(task.supportsQueries()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.druid.k8s.overlord.execution.Selector;
import org.apache.druid.k8s.overlord.execution.SelectorBasedPodTemplateSelectStrategy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.tasklogs.TaskLogs;
import org.easymock.EasyMock;
import org.junit.Assert;
Expand Down Expand Up @@ -537,6 +538,7 @@ public void test_fromTask_taskSupportsQueries() throws IOException
EasyMock.expect(task.getId()).andReturn("id").anyTimes();
EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes();
EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes();

EasyMock.replay(task);
Job actual = adapter.fromTask(task);
Expand All @@ -550,7 +552,46 @@ public void test_fromTask_taskSupportsQueries() throws IOException
}

@Test
public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperites() throws IOException
public void test_fromTask_withBroadcastDatasourceLoadingModeAll() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);

Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
props.setProperty("druid.indexer.runner.k8s.podTemplate.queryable", templatePath.toString());

PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
taskRunnerConfig,
taskConfig,
node,
mapper,
props,
taskLogs,
dynamicConfigRef
);

Task task = EasyMock.mock(Task.class);
EasyMock.expect(task.supportsQueries()).andReturn(true);
EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
EasyMock.expect(task.getId()).andReturn("id").anyTimes();
EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes();
EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes();

EasyMock.replay(task);
Job actual = adapter.fromTask(task);
EasyMock.verify(task);

Assertions.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL.toString(), actual.getSpec().getTemplate()
.getSpec().getContainers()
.get(0).getEnv().stream()
.filter(env -> env.getName().equals(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV))
.collect(Collectors.toList()).get(0).getValue());
}

@Test
public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperties() throws IOException
{
Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
image: one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
Expand Down Expand Up @@ -374,4 +375,10 @@ public LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.NONE;
}

@Override
public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
{
return BroadcastDatasourceLoadingSpec.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
Expand Down Expand Up @@ -104,6 +105,22 @@ public void testGetDefaultLookupLoadingSpec()
Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec());
}

@Test
public void testGetDefaultBroadcastDatasourceLoadingSpec()
{
MSQControllerTask controllerTask = new MSQControllerTask(
null,
MSQ_SPEC,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(BroadcastDatasourceLoadingSpec.NONE, controllerTask.getBroadcastDatasourceLoadingSpec());
}

@Test
public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -412,6 +413,12 @@ public LookupLoadingSpec getLookupLoadingSpec()
return LookupLoadingSpec.NONE;
}

@Override
public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
{
return BroadcastDatasourceLoadingSpec.NONE;
}

@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -327,9 +327,18 @@ static TaskInfo<TaskIdentifier, TaskStatus> toTaskIdentifierInfo(TaskInfo<Task,
* This behaviour can be overridden by passing parameters {@link LookupLoadingSpec#CTX_LOOKUP_LOADING_MODE}
* and {@link LookupLoadingSpec#CTX_LOOKUPS_TO_LOAD} in the task context.
*/
@Nullable
default LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.createFromContext(getContext(), LookupLoadingSpec.ALL);
}

/**
* Specifies the list of broadcast datasources to load for this task. Tasks load ALL broadcast datasources by default.
* This behavior can be overridden by passing parameters {@link BroadcastDatasourceLoadingSpec#CTX_BROADCAST_DATASOURCE_LOADING_MODE}
* and {@link BroadcastDatasourceLoadingSpec#CTX_BROADCAST_DATASOURCES_TO_LOAD} in the task context.
*/
default BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
{
return BroadcastDatasourceLoadingSpec.createFromContext(getContext(), BroadcastDatasourceLoadingSpec.ALL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,16 @@ public TaskStatus call()
}

// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries
// join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here
// for backwards compatibility and can be removed in a future release.
if (task.supportsQueries()) {
command.add("--loadBroadcastSegments");
command.add("true");
}

command.add("--loadBroadcastDatasourceMode");
command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString());

if (!taskFile.exists()) {
jsonMapper.writeValue(taskFile, task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -601,6 +602,16 @@ public void testGetLookupsToLoad()
Assert.assertEquals(LookupLoadingSpec.Mode.NONE, task.getLookupLoadingSpec().getMode());
}

@Test
public void testGetBroadcastDatasourcesToLoad()
{
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.of("2019-03-01/2019-04-01"))
.build();
Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.NONE, task.getBroadcastDatasourceLoadingSpec().getMode());
}

@Test
public void testKillBatchSizeOneAndLimit4() throws Exception
{
Expand Down
Loading