Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,10 @@ public String getType()
{
return TYPE;
}

@Override
public boolean supportsQueries()
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ public void testRunAfterDataInserted() throws Exception
INPUT_FORMAT
)
);
Assert.assertTrue(task.supportsQueries());

final ListenableFuture<TaskStatus> future = runTask(task);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ public String getType()
return TYPE;
}

@Override
public boolean supportsQueries()
{
return true;
}

@VisibleForTesting
AWSCredentialsConfig getAwsCredentialsConfig()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ public void testRunAfterDataInserted() throws Exception
null,
false
)

);
Assert.assertTrue(task.supportsQueries());

final ListenableFuture<TaskStatus> future = runTask(task);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ public <T> QueryRunner<T> getQueryRunner(Query<T> query)
return null;
}

@Override
public boolean supportsQueries()
{
return false;
}

@Override
public String getClasspathPrefix()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ public <T> QueryRunner<T> getQueryRunner(Query<T> query)
return (queryPlus, responseContext) -> queryPlus.run(appenderator, responseContext);
}

@Override
public boolean supportsQueries()
{
return true;
}

@Override
public boolean isReady(TaskActionClient taskActionClient)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ public <T> QueryRunner<T> getQueryRunner(Query<T> query)
return plumber.getQueryRunner(query);
}

@Override
public boolean supportsQueries()
{
return true;
}

@Override
public boolean isReady(TaskActionClient taskActionClient)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ default int getPriority()
*/
<T> QueryRunner<T> getQueryRunner(Query<T> query);

/**
* @return true if this Task type is queryable, such as streaming ingestion tasks
*/
boolean supportsQueries();

/**
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
* extra classpath should be prepended, this should return null or the empty string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,13 @@ public TaskStatus call()
command.add(nodeType);
}

// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries
if (task.supportsQueries()) {
command.add("--loadBroadcastSegments");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the CI failure doesn't like missing tests for this change. Thinking about it for a bit, what will happen if the peon always loads the stuffs for loading broadcast segments? If I'm reading the code correctly, the coordinator will not broadcast segments to batch tasks no matter whether they are running on neither middleManagers nor indexers. The realtime tasks announce themselves when they run on middleManagers or don't announce when they run on indexers, but the batch tasks never announce themselves. So, it seems like, if the peon always loaded the stuffs for broadcast segment loading, those stuffs could be loaded for batch tasks as well but wouldn't do anything? If that's the case, I'm fine with always loading them rather than adding this new parameter and tests for it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I saw the batch tasks loading things from segment cache, which I was trying to avoid right now since they could be loaded on-heap for indexed tables, for performance/memory load reasons.

For the coverage, I'm thinking we should ignore the coverage failure on ForkingTaskRunner and CliPeon for this PR, does that sound okay to you?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the segment cache is a good point. It makes sense.

For the coverage, I'm thinking we should ignore the coverage failure on ForkingTaskRunner and CliPeon for this PR, does that sound okay to you?

I think it's fine.

command.add("true");
}

if (!taskFile.exists()) {
jsonMapper.writeValue(taskFile, task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ public void testBasics() throws Exception
{
expectPublishedSegments(1);
final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null);
Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> statusFuture = runTask(task);

// Wait for firehose to show up, it starts off null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ public void testDeterminePartitions() throws Exception
appenderatorsManager
);

Assert.assertFalse(indexTask.supportsQueries());

final List<DataSegment> segments = runTask(indexTask).rhs;

Assert.assertEquals(2, segments.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ public void testDefaultResource()
Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup());
}

@Test(timeout = 60_000L)
public void testSupportsQueries()
{
final RealtimeIndexTask task = makeRealtimeTask(null);
Assert.assertTrue(task.supportsQueries());
}

@Test(timeout = 60_000L, expected = ExecutionException.class)
public void testHandoffTimeout() throws Exception
Expand Down
4 changes: 0 additions & 4 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,6 @@
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ private String computeCurrentEtag(final Set<SegmentServerSelector> segments, @Nu
Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true;
for (SegmentServerSelector p : segments) {
if (!p.getServer().pick().getServer().segmentReplicatable()) {
if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
hasOnlyHistoricalSegments = false;
break;
}
Expand Down Expand Up @@ -633,7 +633,7 @@ private void addSequencesFromServer(

if (isBySegment) {
serverResults = getBySegmentServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
} else if (!server.segmentReplicatable() || !populateCache) {
} else if (!server.isSegmentReplicationTarget() || !populateCache) {
serverResults = getSimpleServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
} else {
serverResults = getAndCacheServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/apache/druid/client/DruidServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,19 @@ public String getTier()
return metadata.getTier();
}

public boolean segmentReplicatable()
public boolean isSegmentReplicationTarget()
{
return metadata.segmentReplicatable();
return metadata.isSegmentReplicationTarget();
}

public boolean isSegmentBroadcastTarget()
{
return metadata.isSegmentBroadcastTarget();
}

public boolean isSegmentReplicationOrBroadcastTarget()
{
return metadata.isSegmentReplicationTarget() || metadata.isSegmentBroadcastTarget();
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public abstract class DruidNodeDiscoveryProvider
private static final Map<String, Set<NodeRole>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY,
ImmutableSet.of(NodeRole.BROKER, NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
DataNodeService.DISCOVERY_SERVICE_KEY,
ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER, NodeRole.BROKER),
WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.druid.utils.JvmUtils;
import org.hibernate.validator.constraints.NotEmpty;

import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand All @@ -34,8 +34,7 @@
public class SegmentLoaderConfig
{
@JsonProperty
@NotEmpty
private List<StorageLocationConfig> locations = null;
private List<StorageLocationConfig> locations = Collections.emptyList();

@JsonProperty("lazyLoadOnStart")
private boolean lazyLoadOnStart = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public SegmentLoaderLocalCacheManager(
this.indexIO = indexIO;
this.config = config;
this.jsonMapper = mapper;

this.locations = new ArrayList<>();
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, Segm
&& segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
== descriptor.getPartitionNumber()
&& segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
&& segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::segmentReplicatable)) {
&& segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::isSegmentReplicationOrBroadcastTarget)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,21 @@ public int getPriority()
return priority;
}

public boolean segmentReplicatable()
public boolean isSegmentReplicationTarget()
{
return type.isSegmentReplicationTarget();
}

public boolean isSegmentBroadcastTarget()
{
return type.isSegmentBroadcastTarget();
}

public boolean isSegmentReplicationOrBroadcastTarget()
{
return isSegmentReplicationTarget() || isSegmentBroadcastTarget();
}

@Override
public boolean equals(Object o)
{
Expand Down
Loading