Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f57cf70
Initial support for bootstrap segments.
abhishekrb19 Jun 14, 2024
a363511
Fail open by default if there are any errors talking to the coordinator.
abhishekrb19 Jun 14, 2024
97e9734
Add test for failure scenario and cleanup logs.
abhishekrb19 Jun 15, 2024
a4ed4b1
Cleanup and add debug log
abhishekrb19 Jun 17, 2024
8ec621a
Merge branch 'master' into bootstrap_segments_load_api
abhishekrb19 Jun 17, 2024
7459a8a
Assert the events so we know the list exactly.
abhishekrb19 Jun 17, 2024
f0b874a
Revert RunRules test.
abhishekrb19 Jun 17, 2024
d2a1817
Revert RunRulesTest too.
abhishekrb19 Jun 17, 2024
e10565d
Merge branch 'master' into bootstrap_segments_load_api
abhishekrb19 Jun 17, 2024
6a85f99
Remove debug info.
abhishekrb19 Jun 17, 2024
a8316da
Make the API POST and update log.
abhishekrb19 Jun 18, 2024
cbd4c4e
Fix up UTs.
abhishekrb19 Jun 18, 2024
9086f88
Throw 503 from MetadataResource; clean up exception handling and Drui…
abhishekrb19 Jun 18, 2024
e090da9
Remove unused logger, add verification of metrics and docs.
abhishekrb19 Jun 18, 2024
a88f6f6
Merge branch 'master' into bootstrap_segments_load_api
abhishekrb19 Jun 19, 2024
e8e2a21
Update error message
abhishekrb19 Jun 20, 2024
05dab3a
Update server/src/main/java/org/apache/druid/server/coordination/Segm…
abhishekrb19 Jun 21, 2024
98e2fa6
Apply suggestions from code review
abhishekrb19 Jun 21, 2024
3a26f0f
Adjust test metric expectations with the rename.
abhishekrb19 Jun 21, 2024
860d813
Add BootstrapSegmentResponse container in the response for future ext…
abhishekrb19 Jun 21, 2024
db1614b
Merge branch 'master' into bootstrap_segments_load_api
abhishekrb19 Jun 21, 2024
cf525fa
Rename to BootstrapSegmentsInfo for internal consistency.
abhishekrb19 Jun 21, 2024
04793b6
Remove unused log.
abhishekrb19 Jun 21, 2024
b8680d8
Use a member variable for broadcast segments instead of segmentAssigner.
abhishekrb19 Jun 21, 2024
01b81d8
Minor cleanup
abhishekrb19 Jun 21, 2024
7bcf303
Merge branch 'master' into bootstrap_segments_load_api
abhishekrb19 Jun 21, 2024
a6ad9fe
Add test for loadable bootstrap segments and clarify comment.
abhishekrb19 Jun 21, 2024
43b8702
Review suggestions.
abhishekrb19 Jun 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/api-reference/legacy-metadata-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ Returns a list of all segments, overlapping with any of given intervals, for a d

`POST /druid/coordinator/v1/metadata/dataSourceInformation`

Returns information about the specified datasources, including the datasource schema.
Returns information about the specified datasources, including the datasource schema.

`POST /druid/coordinator/v1/metadata/bootstrapSegments`

Returns information about bootstrap segments for all datasources. The returned set includes all broadcast segments if broadcast rules are configured.

<a name="coordinator-datasources"></a>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,19 +331,19 @@ public enum Persona
}

/**
* Category of error. The simplest way to describe this is that it exists as a classification of errors that
* Category of error. The simplest way to describe this is that it exists as a classification of errors that
* enables us to identify the expected response code (e.g. HTTP status code) of a specific DruidException
*/
public enum Category
{
/**
* Means that the exception is being created defensively, because we want to validate something but expect that
* it should never actually be hit. Using this category is good to provide an indication to future reviewers and
* it should never actually be hit. Using this category is good to provide an indication to future reviewers and
* developers that the case being checked is not intended to actually be able to occur in the wild.
*/
DEFENSIVE(500),
/**
* Means that the input provided was malformed in some way. Generally speaking, it is hoped that errors of this
* Means that the input provided was malformed in some way. Generally speaking, it is hoped that errors of this
* category have messages written either targeting the USER or ADMIN personas as those are the general users
* of the APIs who could generate invalid inputs.
*/
Expand All @@ -356,9 +356,8 @@ public enum Category
* Means that an action that was attempted is forbidden
*/
FORBIDDEN(403),

/**
* Means that the requsted requested resource cannot be found.
* Means that the requested resource cannot be found.
*/
NOT_FOUND(404),
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client;

import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.timeline.DataSegment;

public class BootstrapSegmentsResponse
{
private final CloseableIterator<DataSegment> iterator;

public BootstrapSegmentsResponse(final CloseableIterator<DataSegment> iterator)
{
this.iterator = iterator;
}

public CloseableIterator<DataSegment> getIterator()
{
return iterator;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ public JsonParserIterator(
this.hasTimeout = timeoutAt > -1;
}

/**
* Bypasses Jackson serialization to prevent materialization of results from the {@code future} in memory at once.
* A shortened version of {@link #JsonParserIterator(JavaType, Future, String, Query, String, ObjectMapper)}
* where the URL and host parameters, used solely for logging/errors, are not known.
*/
public JsonParserIterator(JavaType typeRef, Future<InputStream> future, ObjectMapper objectMapper)
{
this(typeRef, future, "", null, "", objectMapper);
}

@Override
public boolean hasNext()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.client.coordinator;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.ServiceRetryPolicy;
Expand Down Expand Up @@ -58,6 +59,12 @@ public interface CoordinatorClient
*/
ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(Set<String> datasources);

/**
* Fetch bootstrap segments from the coordinator. The results must be streamed back to the caller as the
* result set can be large.
*/
ListenableFuture<BootstrapSegmentsResponse> fetchBootstrapSegments();

/**
* Returns a new instance backed by a ServiceClient which follows the provided retryPolicy
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.server.coordination.LoadableDataSegment;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
Expand Down Expand Up @@ -156,6 +161,28 @@ public ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(
);
}

@Override
public ListenableFuture<BootstrapSegmentsResponse> fetchBootstrapSegments()
{
final String path = "/druid/coordinator/v1/metadata/bootstrapSegments";
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.POST, path),
new InputStreamResponseHandler()
),
in -> new BootstrapSegmentsResponse(
new JsonParserIterator<>(
// Some servers, like the Broker, may have PruneLoadSpec set to true for optimization reasons.
// We specifically use LoadableDataSegment here instead of DataSegment so the callers can still correctly
// load the bootstrap segments, as the load specs are guaranteed not to be pruned.
jsonMapper.getTypeFactory().constructType(LoadableDataSegment.class),
Futures.immediateFuture(in),
jsonMapper
)
)
);
}

@Override
public CoordinatorClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ private <T> JsonParserIterator<T> asJsonParserIterator(final InputStream in, fin
return new JsonParserIterator<>(
jsonMapper.getTypeFactory().constructType(clazz),
Futures.immediateFuture(in),
"", // We don't know URL at this point, but it's OK to use empty; it's used for logs/errors
Comment thread
kfaraz marked this conversation as resolved.
null,
"", // We don't know host at this point, but it's OK to use empty; it's used for logs/errors
jsonMapper
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.segment.loading;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.io.Files;
import org.apache.druid.java.util.common.FileUtils;
Expand Down Expand Up @@ -117,12 +116,6 @@ public boolean delete()

private static final Logger log = new Logger(LocalDataSegmentPuller.class);

@VisibleForTesting
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
{
getSegmentFiles(getFile(segment), dir);
}

public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException
{
if (sourceFile.isDirectory()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.server.coordination;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
Expand Down Expand Up @@ -59,8 +58,7 @@ public LoadableDataSegment(
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JacksonInject PruneSpecsHolder pruneSpecsHolder
@JsonProperty("size") long size
)
{
super(
Expand Down
Loading