Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
983d999
Add broker cache of coordinator dynamic configs
adarshsanjeev Apr 10, 2025
267f134
Clean resource
adarshsanjeev Apr 10, 2025
9e48c64
Refactor server selector
adarshsanjeev Apr 10, 2025
c449dc6
Make coordinator broadcast config changes
adarshsanjeev Apr 11, 2025
b5713d1
Add status APIs
adarshsanjeev Apr 11, 2025
aaa39e5
rename to CoordinatorDynamicConfigView
adarshsanjeev Apr 11, 2025
fa35902
Add cloneQueryMode enum
adarshsanjeev Apr 11, 2025
9789f1f
Cleanup
adarshsanjeev Apr 12, 2025
34e7d78
Cleanup
adarshsanjeev Apr 12, 2025
4263706
Rewrite status API
adarshsanjeev Apr 12, 2025
227ea1d
Rework coordinator changes
adarshsanjeev Apr 12, 2025
478ffed
Cleanup
adarshsanjeev Apr 12, 2025
76be7b5
Update with node discovery
adarshsanjeev Apr 14, 2025
b1f7e00
Tests
adarshsanjeev Apr 14, 2025
4acf4b7
Add docs
adarshsanjeev Apr 15, 2025
3e8b32e
Fix Benchmarks
adarshsanjeev Apr 15, 2025
91b0bab
Fix broker string
adarshsanjeev Apr 15, 2025
be8568e
Cleanup
adarshsanjeev Apr 15, 2025
bc53f0c
Fix tests
adarshsanjeev Apr 15, 2025
6383262
Merge remote-tracking branch 'origin/master' into broker-unmanaged-hi…
adarshsanjeev Apr 18, 2025
b6ebf32
Address review comments
adarshsanjeev Apr 18, 2025
270ceec
Add javadocs
adarshsanjeev Apr 19, 2025
0ae0686
Add test
adarshsanjeev Apr 21, 2025
25224c4
Refactor
adarshsanjeev Apr 21, 2025
dcaf0d0
Refactor
adarshsanjeev Apr 21, 2025
3f692d3
Refactor
adarshsanjeev Apr 21, 2025
77aec3f
Delete duty, move syncing to leadership start
adarshsanjeev Apr 21, 2025
6b0f141
Update APIs
adarshsanjeev Apr 21, 2025
bd8d026
Update docs
adarshsanjeev Apr 22, 2025
bc9906e
Add javadocs
adarshsanjeev Apr 23, 2025
a05e407
Refactor ServerSelector to contain HistoricalFilter
adarshsanjeev Apr 25, 2025
bc85133
Refactor to address comments
adarshsanjeev Apr 27, 2025
f475cf6
Refactor to address comments
adarshsanjeev Apr 27, 2025
cdda2e2
Update docs
adarshsanjeev Apr 27, 2025
5c7a32b
Clean up
adarshsanjeev Apr 28, 2025
d66242d
Increase retries while fetching coordinator dynamic config
adarshsanjeev Apr 28, 2025
96b77c9
Merge remote-tracking branch 'origin/master' into broker-unmanaged-hi…
adarshsanjeev Apr 28, 2025
5f82391
Fix test
adarshsanjeev Apr 28, 2025
902f18a
Move classes into separate files
adarshsanjeev Apr 28, 2025
455edf3
Rename enum
adarshsanjeev Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import org.apache.druid.client.BrokerViewOfCoordinatorConfig;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidServer;
Expand All @@ -53,6 +54,7 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
Expand Down Expand Up @@ -100,8 +102,11 @@
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.ResourceIdPopulatingQueryRunner;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.TestCoordinatorClient;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
Expand Down Expand Up @@ -176,6 +181,7 @@ public class CachingClusteredClientBenchmark
private final QuerySegmentSpec basicSchemaIntervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(basicSchema.getDataInterval())
);
private final BrokerViewOfCoordinatorConfig filter = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient());

private final int numProcessingThreads = 4;

Expand Down Expand Up @@ -222,6 +228,7 @@ public void setup()
rowsPerSegment
);
queryableIndexes.put(dataSegment, index);
filter.start();
}

final DruidProcessingConfig processingConfig = new DruidProcessingConfig()
Expand Down Expand Up @@ -368,18 +375,21 @@ public void tearDown() throws IOException
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void timeseriesQuery(Blackhole blackhole)
{
query = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(basicSchemaIntervalSpec)
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.context(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();
Query<?> q = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(basicSchemaIntervalSpec)
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.context(
ImmutableMap.of(
BaseQuery.QUERY_ID, "BenchmarkQuery",
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();

query = prepareQuery(q);

final List<Result<TimeseriesResultValue>> results = runQuery();

Expand All @@ -393,7 +403,7 @@ public void timeseriesQuery(Blackhole blackhole)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void topNQuery(Blackhole blackhole)
{
query = new TopNQueryBuilder()
Query<?> q = new TopNQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(basicSchemaIntervalSpec)
.dimension(new DefaultDimensionSpec("dimZipf", null))
Expand All @@ -403,12 +413,15 @@ public void topNQuery(Blackhole blackhole)
.threshold(10_000) // we are primarily measuring 'broker' merge time, so collect a significant number of results
.context(
ImmutableMap.of(
BaseQuery.QUERY_ID, "BenchmarkQuery",
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();

query = prepareQuery(q);

final List<Result<TopNResultValue>> results = runQuery();

for (Result<TopNResultValue> result : results) {
Expand All @@ -421,7 +434,7 @@ public void topNQuery(Blackhole blackhole)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void groupByQuery(Blackhole blackhole)
{
query = GroupByQuery
Query<?> q = GroupByQuery
.builder()
.setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(basicSchemaIntervalSpec)
Expand All @@ -433,19 +446,33 @@ public void groupByQuery(Blackhole blackhole)
.setGranularity(Granularity.fromString(queryGranularity))
.setContext(
ImmutableMap.of(
BaseQuery.QUERY_ID, "BenchmarkQuery",
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();

query = prepareQuery(q);

final List<ResultRow> results = runQuery();

for (ResultRow result : results) {
blackhole.consume(result);
}
}

private <T> Query<T> prepareQuery(Query<T> query)
{
return ResourceIdPopulatingQueryRunner.populateResourceId(query)
.withDataSource(ClientQuerySegmentWalker.generateSubqueryIds(
query.getDataSource(),
query.getId(),
query.getSqlQueryId(),
query.context().getString(QueryContexts.QUERY_RESOURCE_ID)
));
}

private <T> List<T> runQuery()
{
//noinspection unchecked
Expand Down Expand Up @@ -496,7 +523,7 @@ void addSegmentToServer(DruidServer server, DataSegment segment)
{
final ServerSelector selector = selectors.computeIfAbsent(
segment.getId().toString(),
k -> new ServerSelector(segment, tierSelectorStrategy)
k -> new ServerSelector(segment, tierSelectorStrategy, filter)
);
selector.addServerAndUpdateSegment(servers.get(server), segment);
timelines.computeIfAbsent(segment.getDataSource(), k -> new VersionedIntervalTimeline<>(Ordering.natural()))
Expand Down
134 changes: 133 additions & 1 deletion docs/api-reference/service-status-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,138 @@ Host: http://COORDINATOR_IP:COORDINATOR_PORT

</details>


### Get Historical Cloning Status

Retrieves the current status of Historical cloning from the Coordinator.

#### URL

`GET` `/druid/coordinator/v1/config/cloneStatus`

#### Responses

<Tabs>

<TabItem value="56" label="200 SUCCESS">


<br/>

*Successfully retrieved cloning status*

</TabItem>
</Tabs>

#### Sample request

<Tabs>

<TabItem value="58" label="cURL">


```shell
curl "http://COORDINATOR_IP:COORDINATOR_PORT/druid/coordinator/v1/config/cloneStatus"
```

</TabItem>
<TabItem value="59" label="HTTP">


```http
GET /druid/coordinator/v1/config/cloneStatus HTTP/1.1
Host: http://COORDINATOR_IP:COORDINATOR_PORT
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"cloneStatus": [
{
"sourceServer": "localhost:8089",
"targetServer": "localhost:8083",
"state": "IN_PROGRESS",
"segmentLoadsRemaining": 0,
"segmentDropsRemaining": 0,
"bytesToLoad": 0
}
]
}
```

</details>

### Get Broker dynamic configuration view

Retrieves the list of Brokers which have an up-to-date view of Coordinator dynamic configuration.

#### URL

`GET` `/druid/coordinator/v1/config/syncedBrokers`

#### Responses

<Tabs>

<TabItem value="56" label="200 SUCCESS">


<br/>

*Successfully retrieved Broker Configuration view*

</TabItem>
</Tabs>

#### Sample request

<Tabs>

<TabItem value="58" label="cURL">


```shell
curl "http://COORDINATOR_IP:COORDINATOR_PORT/druid/coordinator/v1/config/syncedBrokers"
```

</TabItem>
<TabItem value="59" label="HTTP">


```http
GET /druid/coordinator/v1/config/syncedBrokers HTTP/1.1
Host: http://COORDINATOR_IP:COORDINATOR_PORT
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"syncedBrokers": [
{
"host": "localhost",
"port": 8082,
"syncTimeInMs": 1745756337472
}
]
}
```

</details>

## Overlord

### Get Overlord leader address
Expand Down Expand Up @@ -1334,4 +1466,4 @@ Host: http://BROKER_IP:BROKER_PORT

#### Sample response

A successful response to this endpoint results in an empty response body.
A successful response to this endpoint results in an empty response body.
1 change: 1 addition & 0 deletions docs/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters
|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:<br />- Log the stack trace of the exception (if any) produced by the query |
|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `clonesPreferred`. `excludeClones` means that clone Historicals are not queried by the broker. `clonesPreferred` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones; Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow up: Use preferClones instead of clonesPreferred.


## Parameters by query type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.BrokerViewOfCoordinatorConfig;
import org.apache.druid.client.DirectDruidClientFactory;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.TestCoordinatorClient;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -319,12 +321,15 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
EasyMock.createMock(HttpClient.class)
);

BrokerViewOfCoordinatorConfig filter = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient());
filter.start();
brokerServerView = new BrokerServerView(
druidClientFactory,
baseView,
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
new NoopServiceEmitter(),
new BrokerSegmentWatcherConfig()
new BrokerSegmentWatcherConfig(),
filter
);
baseView.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ public long getMaxQueuedBytes()
return 0L;
}
};

CachingClusteredClient baseClient = new CachingClusteredClient(
conglomerate,
new TimelineServerView()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.query.CloneQueryMode;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.server.coordination.DruidServerMetadata;
Expand Down Expand Up @@ -162,7 +163,8 @@ public List<InputSlice> sliceDynamic(
*/
int findWorkerForServerSelector(final ServerSelector serverSelector, final int maxNumSlices)
{
final QueryableDruidServer server = serverSelector.pick(null);
// Currently, Dart does not support clone query modes, all servers can be queried.
final QueryableDruidServer server = serverSelector.pick(null, CloneQueryMode.INCLUDECLONES);

if (server == null) {
return UNKNOWN;
Expand Down Expand Up @@ -279,7 +281,8 @@ static boolean shouldIncludeSegment(final ServerSelector serverSelector)
int numRealtimeServers = 0;
int numOtherServers = 0;

for (final DruidServerMetadata server : serverSelector.getAllServers()) {
// Currently, Dart does not support clone query modes, all servers can be queried.
for (final DruidServerMetadata server : serverSelector.getAllServers(CloneQueryMode.INCLUDECLONES)) {
if (SegmentSource.REALTIME.getUsedServerTypes().contains(server.getType())) {
numRealtimeServers++;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.client.QueryableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.HistoricalFilter;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.data.input.StringTuple;
Expand Down Expand Up @@ -214,7 +215,8 @@ void setUp()
final IntList segmentServers = entry.getValue();
final ServerSelector serverSelector = new ServerSelector(
dataSegment,
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
HistoricalFilter.IDENTITY_FILTER
);
for (int serverNumber : segmentServers) {
final DruidServerMetadata serverMetadata = SERVERS.get(serverNumber);
Expand Down
Loading
Loading