From 4dc2ddf77494440c62ebbcf1052902055a988d41 Mon Sep 17 00:00:00 2001 From: Tarun Kancherla Date: Wed, 29 Jan 2025 22:54:13 +0530 Subject: [PATCH 1/2] Migrated from deprecated BrokerClient to new BrokerClient --- .../msq/exec/SegmentLoadStatusFetcher.java | 942 +++++++++--------- 1 file changed, 467 insertions(+), 475 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index d4eaef600125..941a05fd2346 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -17,478 +17,470 @@ * under the License. */ -package org.apache.druid.msq.exec; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.discovery.BrokerClient; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.sql.http.ResultFormat; -import org.apache.druid.sql.http.SqlQuery; -import org.apache.druid.timeline.DataSegment; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.joda.time.DateTime; -import org.joda.time.Interval; - -import javax.annotation.Nullable; -import javax.ws.rs.core.MediaType; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Class that periodically checks with the broker if all the segments generated are loaded by querying the sys table - * and blocks till it is complete. This will account for and not wait for segments that would never be loaded due to - * load rules. Should only be called if the query generates new segments or tombstones. - *
- * If an exception is thrown during operation, this will simply log the exception and exit without failing the task, - * since the segments have already been published successfully, and should be loaded eventually. - *
- * If the segments are not loaded within {@link #TIMEOUT_DURATION_MILLIS} milliseconds, this logs a warning and exits - * for the same reason. - */ -public class SegmentLoadStatusFetcher implements AutoCloseable -{ - private static final Logger log = new Logger(SegmentLoadStatusFetcher.class); - private static final long SLEEP_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(5); - private static final long TIMEOUT_DURATION_MILLIS = TimeUnit.MINUTES.toMillis(10); - - /** - * The query sent to the broker. This query uses replication_factor to determine how many copies of a segment has to be - * loaded as per the load rules. - * - If a segment is not used, the broker will not have any information about it, hence, a COUNT(*) should return the used count only. - * - If replication_factor is more than 0, the segment will be loaded on historicals and needs to be waited for. - * - If replication_factor is 0, that means that the segment will never be loaded on a historical and does not need to - * be waited for. - * - If replication_factor is -1, the replication factor is not known currently and will become known after a load rule - * evaluation. - *
- * See this for more details about replication_factor - */ - private static final String LOAD_QUERY = "SELECT COUNT(*) AS usedSegments,\n" - + "COUNT(*) FILTER (WHERE is_published = 1 AND replication_factor > 0) AS precachedSegments,\n" - + "COUNT(*) FILTER (WHERE is_published = 1 AND replication_factor = 0) AS onDemandSegments,\n" - + "COUNT(*) FILTER (WHERE is_available = 0 AND is_published = 1 AND replication_factor != 0) AS pendingSegments,\n" - + "COUNT(*) FILTER (WHERE replication_factor = -1) AS unknownSegments\n" - + "FROM sys.segments\n" - + "WHERE datasource = '%s' AND is_overshadowed = 0 AND (%s)"; - - private final BrokerClient brokerClient; - private final ObjectMapper objectMapper; - // Map of version vs latest load status. - private final AtomicReference versionLoadStatusReference; - private final String datasource; - private final String versionsConditionString; - private final int totalSegmentsGenerated; - private final boolean doWait; - // since live reports fetch the value in another thread, we need to use AtomicReference - private final AtomicReference status; - - private final ListeningExecutorService executorService; - - public SegmentLoadStatusFetcher( - BrokerClient brokerClient, - ObjectMapper objectMapper, - String queryId, - String datasource, - Set dataSegments, - boolean doWait - ) - { - this.brokerClient = brokerClient; - this.objectMapper = objectMapper; - this.datasource = datasource; - this.versionsConditionString = createVersionCondition(dataSegments); - this.totalSegmentsGenerated = dataSegments.size(); - this.versionLoadStatusReference = new AtomicReference<>(new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated)); - this.status = new AtomicReference<>(new SegmentLoadWaiterStatus( - State.INIT, - null, - 0, - totalSegmentsGenerated, - 0, - 0, - 0, - 0, - totalSegmentsGenerated - )); - this.doWait = doWait; - this.executorService = MoreExecutors.listeningDecorator( - Execs.singleThreaded(StringUtils.encodeForFormat(queryId) + "-segment-load-waiter-%d") - ); - } - - /** - * Uses broker client to check if all segments created by the ingestion have been loaded and updates the {@link #status)} - * periodically. - *
- * If an exception is thrown during operation, this will log the exception and return without failing the task, - * since the segments have already been published successfully, and should be loaded eventually. - *
- * Only expected to be called from the main controller thread. - */ - public void waitForSegmentsToLoad() - { - final DateTime startTime = DateTimes.nowUtc(); - final AtomicReference hasAnySegmentBeenLoaded = new AtomicReference<>(false); - try { - FutureUtils.getUnchecked(executorService.submit(() -> { - long lastLogMillis = -TimeUnit.MINUTES.toMillis(1); - try { - while (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) { - // Check the timeout and exit if exceeded. - long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis(); - if (runningMillis > TIMEOUT_DURATION_MILLIS) { - log.warn( - "Runtime[%d] exceeded timeout[%d] while waiting for segments to load. Exiting.", - runningMillis, - TIMEOUT_DURATION_MILLIS - ); - updateStatus(State.TIMED_OUT, startTime); - return; - } - - if (runningMillis - lastLogMillis >= TimeUnit.MINUTES.toMillis(1)) { - lastLogMillis = runningMillis; - log.info( - "Fetching segment load status for datasource[%s] from broker", - datasource - ); - } - - // Fetch the load status from the broker - VersionLoadStatus loadStatus = fetchLoadStatusFromBroker(); - versionLoadStatusReference.set(loadStatus); - hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || loadStatus.getUsedSegments() > 0); - - if (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) { - // Update the status. - updateStatus(State.WAITING, startTime); - // Sleep for a bit before checking again. - waitIfNeeded(SLEEP_DURATION_MILLIS); - } - } - } - catch (Exception e) { - log.warn(e, "Exception occurred while waiting for segments to load. Exiting."); - // Update the status and return. - updateStatus(State.FAILED, startTime); - return; - } - // Update the status. - log.info("Segment loading completed for datasource[%s]", datasource); - updateStatus(State.SUCCESS, startTime); - }), true); - } - catch (Exception e) { - log.warn(e, "Exception occurred while waiting for segments to load. Exiting."); - updateStatus(State.FAILED, startTime); - } - finally { - executorService.shutdownNow(); - } - } - private void waitIfNeeded(long waitTimeMillis) throws Exception - { - if (doWait) { - Thread.sleep(waitTimeMillis); - } - } - - /** - * Updates the {@link #status} with the latest details based on {@link #versionLoadStatusReference} - */ - private void updateStatus(State state, DateTime startTime) - { - long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis(); - VersionLoadStatus versionLoadStatus = versionLoadStatusReference.get(); - status.set( - new SegmentLoadWaiterStatus( - state, - startTime, - runningMillis, - totalSegmentsGenerated, - versionLoadStatus.getUsedSegments(), - versionLoadStatus.getPrecachedSegments(), - versionLoadStatus.getOnDemandSegments(), - versionLoadStatus.getPendingSegments(), - versionLoadStatus.getUnknownSegments() - ) - ); - } - - /** - * Uses {@link #brokerClient} to fetch latest load status for a given set of versions. Converts the response into a - * {@link VersionLoadStatus} and returns it. - */ - private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception - { - Request request = brokerClient.makeRequest(HttpMethod.POST, "/druid/v2/sql/"); - SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsConditionString), - ResultFormat.OBJECTLINES, - false, false, false, null, null - ); - request.setContent(MediaType.APPLICATION_JSON, objectMapper.writeValueAsBytes(sqlQuery)); - String response = brokerClient.sendQuery(request); - - if (response == null) { - // Unable to query broker - return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated); - } else if (response.trim().isEmpty()) { - // If no segments are returned for a version, all segments have been dropped by a drop rule. - return new VersionLoadStatus(0, 0, 0, 0, 0); - } else { - return objectMapper.readValue(response, VersionLoadStatus.class); - } - } - - /** - * Takes a list of segments and creates the condition for the broker query. Directly creates a string to avoid - * computing it repeatedly. - */ - private static String createVersionCondition(Set dataSegments) - { - // Creates a map of version to earliest and latest partition numbers created. These would be contiguous since the task - // holds the lock. - Map> versionsVsPartitionNumberRangeMap = new HashMap<>(); - - dataSegments.forEach(segment -> { - final String version = segment.getVersion(); - final int partitionNum = segment.getId().getPartitionNum(); - versionsVsPartitionNumberRangeMap.computeIfPresent(version, (k, v) -> Pair.of( - partitionNum < v.lhs ? partitionNum : v.lhs, - partitionNum > v.rhs ? partitionNum : v.rhs - )); - versionsVsPartitionNumberRangeMap.computeIfAbsent(version, k -> Pair.of(partitionNum, partitionNum)); - }); - - // Create a condition for each version / partition - List versionConditionList = new ArrayList<>(); - for (Map.Entry> stringPairEntry : versionsVsPartitionNumberRangeMap.entrySet()) { - Pair pair = stringPairEntry.getValue(); - versionConditionList.add( - StringUtils.format("(version = '%s' AND partition_num BETWEEN %s AND %s)", stringPairEntry.getKey(), pair.lhs, pair.rhs) - ); - } - return String.join(" OR ", versionConditionList); - } - - /** - * Returns the current status of the load. - */ - public SegmentLoadWaiterStatus status() - { - return status.get(); - } - - @Override - public void close() - { - try { - executorService.shutdownNow(); - } - catch (Throwable suppressed) { - log.warn(suppressed, "Error shutting down SegmentLoadStatusFetcher"); - } - } - - public static class SegmentLoadWaiterStatus - { - private final State state; - private final DateTime startTime; - private final long duration; - private final int totalSegments; - private final int usedSegments; - private final int precachedSegments; - private final int onDemandSegments; - private final int pendingSegments; - private final int unknownSegments; - - @JsonCreator - public SegmentLoadWaiterStatus( - @JsonProperty("state") SegmentLoadStatusFetcher.State state, - @JsonProperty("startTime") @Nullable DateTime startTime, - @JsonProperty("duration") long duration, - @JsonProperty("totalSegments") int totalSegments, - @JsonProperty("usedSegments") int usedSegments, - @JsonProperty("precachedSegments") int precachedSegments, - @JsonProperty("onDemandSegments") int onDemandSegments, - @JsonProperty("pendingSegments") int pendingSegments, - @JsonProperty("unknownSegments") int unknownSegments - ) - { - this.state = state; - this.startTime = startTime; - this.duration = duration; - this.totalSegments = totalSegments; - this.usedSegments = usedSegments; - this.precachedSegments = precachedSegments; - this.onDemandSegments = onDemandSegments; - this.pendingSegments = pendingSegments; - this.unknownSegments = unknownSegments; - } - - @JsonProperty - public SegmentLoadStatusFetcher.State getState() - { - return state; - } - - @Nullable - @JsonProperty - @JsonInclude(JsonInclude.Include.NON_NULL) - public DateTime getStartTime() - { - return startTime; - } - - @JsonProperty - public long getDuration() - { - return duration; - } - - @JsonProperty - public long getTotalSegments() - { - return totalSegments; - } - - @JsonProperty - public int getUsedSegments() - { - return usedSegments; - } - - @JsonProperty - public int getPrecachedSegments() - { - return precachedSegments; - } - - @JsonProperty - public int getOnDemandSegments() - { - return onDemandSegments; - } - - @JsonProperty - public int getPendingSegments() - { - return pendingSegments; - } - - @JsonProperty - public int getUnknownSegments() - { - return unknownSegments; - } - } - - public enum State - { - /** - * Initial state after being initialised with the segment versions and before #waitForSegmentsToLoad has been called. - */ - INIT, - /** - * All segments that need to be loaded have not yet been loaded. The load status is perodically being queried from - * the broker. - */ - WAITING, - /** - * All segments which need to be loaded have been loaded, and the SegmentLoadWaiter exited successfully. - */ - SUCCESS, - /** - * An exception occurred while checking load status. The SegmentLoadWaiter exited without failing the task. - */ - FAILED, - /** - * The time spent waiting for segments to load exceeded org.apache.druid.msq.exec.SegmentLoadWaiter#TIMEOUT_DURATION_MILLIS. - * The SegmentLoadWaiter exited without failing the task. - */ - TIMED_OUT; - - public boolean isFinished() - { - return this == SUCCESS || this == FAILED || this == TIMED_OUT; - } - } - - public static class VersionLoadStatus - { - private final int usedSegments; - private final int precachedSegments; - private final int onDemandSegments; - private final int pendingSegments; - private final int unknownSegments; - - @JsonCreator - public VersionLoadStatus( - @JsonProperty("usedSegments") int usedSegments, - @JsonProperty("precachedSegments") int precachedSegments, - @JsonProperty("onDemandSegments") int onDemandSegments, - @JsonProperty("pendingSegments") int pendingSegments, - @JsonProperty("unknownSegments") int unknownSegments - ) - { - this.usedSegments = usedSegments; - this.precachedSegments = precachedSegments; - this.onDemandSegments = onDemandSegments; - this.pendingSegments = pendingSegments; - this.unknownSegments = unknownSegments; - } - - @JsonProperty - public int getUsedSegments() - { - return usedSegments; - } - - @JsonProperty - public int getPrecachedSegments() - { - return precachedSegments; - } - - @JsonProperty - public int getOnDemandSegments() - { - return onDemandSegments; - } - - @JsonProperty - public int getPendingSegments() - { - return pendingSegments; - } - - @JsonProperty - public int getUnknownSegments() - { - return unknownSegments; - } - - @JsonIgnore - public boolean isLoadingComplete() - { - return pendingSegments == 0 && (usedSegments == precachedSegments + onDemandSegments); - } - } -} + package org.apache.druid.msq.exec; + + import com.fasterxml.jackson.annotation.JsonCreator; + import com.fasterxml.jackson.annotation.JsonIgnore; + import com.fasterxml.jackson.annotation.JsonInclude; + import com.fasterxml.jackson.annotation.JsonProperty; + import com.fasterxml.jackson.databind.ObjectMapper; + import com.google.common.util.concurrent.ListeningExecutorService; + import com.google.common.util.concurrent.MoreExecutors; + import org.apache.druid.common.guava.FutureUtils; + import org.apache.druid.java.util.common.DateTimes; + import org.apache.druid.java.util.common.Pair; + import org.apache.druid.java.util.common.StringUtils; + import org.apache.druid.java.util.common.concurrent.Execs; + import org.apache.druid.java.util.common.logger.Logger; + import org.apache.druid.sql.client.BrokerClient; + import org.apache.druid.sql.http.ResultFormat; + import org.apache.druid.sql.http.SqlQuery; + import org.apache.druid.timeline.DataSegment; + import org.joda.time.DateTime; + import org.joda.time.Interval; + + import javax.annotation.Nullable; + import javax.ws.rs.core.MediaType; + import java.util.ArrayList; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicReference; + + /** + * Class that periodically checks with the broker if all the segments generated are loaded by querying the sys table + * and blocks till it is complete. This will account for and not wait for segments that would never be loaded due to + * load rules. Should only be called if the query generates new segments or tombstones. + *
+ * If an exception is thrown during operation, this will simply log the exception and exit without failing the task, + * since the segments have already been published successfully, and should be loaded eventually. + *
+ * If the segments are not loaded within {@link #TIMEOUT_DURATION_MILLIS} milliseconds, this logs a warning and exits + * for the same reason. + */ + public class SegmentLoadStatusFetcher implements AutoCloseable + { + private static final Logger log = new Logger(SegmentLoadStatusFetcher.class); + private static final long SLEEP_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(5); + private static final long TIMEOUT_DURATION_MILLIS = TimeUnit.MINUTES.toMillis(10); + + /** + * The query sent to the broker. This query uses replication_factor to determine how many copies of a segment has to be + * loaded as per the load rules. + * - If a segment is not used, the broker will not have any information about it, hence, a COUNT(*) should return the used count only. + * - If replication_factor is more than 0, the segment will be loaded on historicals and needs to be waited for. + * - If replication_factor is 0, that means that the segment will never be loaded on a historical and does not need to + * be waited for. + * - If replication_factor is -1, the replication factor is not known currently and will become known after a load rule + * evaluation. + *
+ * See this for more details about replication_factor + */ + private static final String LOAD_QUERY = "SELECT COUNT(*) AS usedSegments,\n" + + "COUNT(*) FILTER (WHERE is_published = 1 AND replication_factor > 0) AS precachedSegments,\n" + + "COUNT(*) FILTER (WHERE is_published = 1 AND replication_factor = 0) AS onDemandSegments,\n" + + "COUNT(*) FILTER (WHERE is_available = 0 AND is_published = 1 AND replication_factor != 0) AS pendingSegments,\n" + + "COUNT(*) FILTER (WHERE replication_factor = -1) AS unknownSegments\n" + + "FROM sys.segments\n" + + "WHERE datasource = '%s' AND is_overshadowed = 0 AND (%s)"; + + private final BrokerClient brokerClient; + private final ObjectMapper objectMapper; + // Map of version vs latest load status. + private final AtomicReference versionLoadStatusReference; + private final String datasource; + private final String versionsConditionString; + private final int totalSegmentsGenerated; + private final boolean doWait; + // since live reports fetch the value in another thread, we need to use AtomicReference + private final AtomicReference status; + + private final ListeningExecutorService executorService; + + public SegmentLoadStatusFetcher( + BrokerClient brokerClient, + ObjectMapper objectMapper, + String queryId, + String datasource, + Set dataSegments, + boolean doWait + ) + { + this.brokerClient = brokerClient; + this.objectMapper = objectMapper; + this.datasource = datasource; + this.versionsConditionString = createVersionCondition(dataSegments); + this.totalSegmentsGenerated = dataSegments.size(); + this.versionLoadStatusReference = new AtomicReference<>(new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated)); + this.status = new AtomicReference<>(new SegmentLoadWaiterStatus( + State.INIT, + null, + 0, + totalSegmentsGenerated, + 0, + 0, + 0, + 0, + totalSegmentsGenerated + )); + this.doWait = doWait; + this.executorService = MoreExecutors.listeningDecorator( + Execs.singleThreaded(StringUtils.encodeForFormat(queryId) + "-segment-load-waiter-%d") + ); + } + + /** + * Uses broker client to check if all segments created by the ingestion have been loaded and updates the {@link #status)} + * periodically. + *
+ * If an exception is thrown during operation, this will log the exception and return without failing the task, + * since the segments have already been published successfully, and should be loaded eventually. + *
+ * Only expected to be called from the main controller thread. + */ + public void waitForSegmentsToLoad() + { + final DateTime startTime = DateTimes.nowUtc(); + final AtomicReference hasAnySegmentBeenLoaded = new AtomicReference<>(false); + try { + FutureUtils.getUnchecked(executorService.submit(() -> { + long lastLogMillis = -TimeUnit.MINUTES.toMillis(1); + try { + while (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) { + // Check the timeout and exit if exceeded. + long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis(); + if (runningMillis > TIMEOUT_DURATION_MILLIS) { + log.warn( + "Runtime[%d] exceeded timeout[%d] while waiting for segments to load. Exiting.", + runningMillis, + TIMEOUT_DURATION_MILLIS + ); + updateStatus(State.TIMED_OUT, startTime); + return; + } + + if (runningMillis - lastLogMillis >= TimeUnit.MINUTES.toMillis(1)) { + lastLogMillis = runningMillis; + log.info( + "Fetching segment load status for datasource[%s] from broker", + datasource + ); + } + + // Fetch the load status from the broker + VersionLoadStatus loadStatus = fetchLoadStatusFromBroker(); + versionLoadStatusReference.set(loadStatus); + hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || loadStatus.getUsedSegments() > 0); + + if (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) { + // Update the status. + updateStatus(State.WAITING, startTime); + // Sleep for a bit before checking again. + waitIfNeeded(SLEEP_DURATION_MILLIS); + } + } + } + catch (Exception e) { + log.warn(e, "Exception occurred while waiting for segments to load. Exiting."); + // Update the status and return. + updateStatus(State.FAILED, startTime); + return; + } + // Update the status. + log.info("Segment loading completed for datasource[%s]", datasource); + updateStatus(State.SUCCESS, startTime); + }), true); + } + catch (Exception e) { + log.warn(e, "Exception occurred while waiting for segments to load. Exiting."); + updateStatus(State.FAILED, startTime); + } + finally { + executorService.shutdownNow(); + } + } + + private void waitIfNeeded(long waitTimeMillis) throws Exception + { + if (doWait) { + Thread.sleep(waitTimeMillis); + } + } + + /** + * Updates the {@link #status} with the latest details based on {@link #versionLoadStatusReference} + */ + private void updateStatus(State state, DateTime startTime) + { + long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis(); + VersionLoadStatus versionLoadStatus = versionLoadStatusReference.get(); + status.set( + new SegmentLoadWaiterStatus( + state, + startTime, + runningMillis, + totalSegmentsGenerated, + versionLoadStatus.getUsedSegments(), + versionLoadStatus.getPrecachedSegments(), + versionLoadStatus.getOnDemandSegments(), + versionLoadStatus.getPendingSegments(), + versionLoadStatus.getUnknownSegments() + ) + ); + } + + /** + * Uses {@link #brokerClient} to fetch latest load status for a given set of versions. Converts the response into a + * {@link VersionLoadStatus} and returns it. + */ + private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception + { + SqlQuery sqlQuery = new SqlQuery( + StringUtils.format(LOAD_QUERY, datasource, versionsConditionString), + ResultFormat.OBJECTLINES, + false, false, false, null, null + ); + String response = brokerClient.submit(sqlQuery); + + if (response == null) { + // Unable to query broker + return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated); + } else if (response.trim().isEmpty()) { + // If no segments are returned for a version, all segments have been dropped by a drop rule. + return new VersionLoadStatus(0, 0, 0, 0, 0); + } else { + return objectMapper.readValue(response, VersionLoadStatus.class); + } + } + + /** + * Takes a list of segments and creates the condition for the broker query. Directly creates a string to avoid + * computing it repeatedly. + */ + private static String createVersionCondition(Set dataSegments) + { + // Creates a map of version to earliest and latest partition numbers created. These would be contiguous since the task + // holds the lock. + Map> versionsVsPartitionNumberRangeMap = new HashMap<>(); + + dataSegments.forEach(segment -> { + final String version = segment.getVersion(); + final int partitionNum = segment.getId().getPartitionNum(); + versionsVsPartitionNumberRangeMap.computeIfPresent(version, (k, v) -> Pair.of( + partitionNum < v.lhs ? partitionNum : v.lhs, + partitionNum > v.rhs ? partitionNum : v.rhs + )); + versionsVsPartitionNumberRangeMap.computeIfAbsent(version, k -> Pair.of(partitionNum, partitionNum)); + }); + + // Create a condition for each version / partition + List versionConditionList = new ArrayList<>(); + for (Map.Entry> stringPairEntry : versionsVsPartitionNumberRangeMap.entrySet()) { + Pair pair = stringPairEntry.getValue(); + versionConditionList.add( + StringUtils.format("(version = '%s' AND partition_num BETWEEN %s AND %s)", stringPairEntry.getKey(), pair.lhs, pair.rhs) + ); + } + return String.join(" OR ", versionConditionList); + } + + /** + * Returns the current status of the load. + */ + public SegmentLoadWaiterStatus status() + { + return status.get(); + } + + @Override + public void close() + { + try { + executorService.shutdownNow(); + } + catch (Throwable suppressed) { + log.warn(suppressed, "Error shutting down SegmentLoadStatusFetcher"); + } + } + + public static class SegmentLoadWaiterStatus + { + private final State state; + private final DateTime startTime; + private final long duration; + private final int totalSegments; + private final int usedSegments; + private final int precachedSegments; + private final int onDemandSegments; + private final int pendingSegments; + private final int unknownSegments; + + @JsonCreator + public SegmentLoadWaiterStatus( + @JsonProperty("state") SegmentLoadStatusFetcher.State state, + @JsonProperty("startTime") @Nullable DateTime startTime, + @JsonProperty("duration") long duration, + @JsonProperty("totalSegments") int totalSegments, + @JsonProperty("usedSegments") int usedSegments, + @JsonProperty("precachedSegments") int precachedSegments, + @JsonProperty("onDemandSegments") int onDemandSegments, + @JsonProperty("pendingSegments") int pendingSegments, + @JsonProperty("unknownSegments") int unknownSegments + ) + { + this.state = state; + this.startTime = startTime; + this.duration = duration; + this.totalSegments = totalSegments; + this.usedSegments = usedSegments; + this.precachedSegments = precachedSegments; + this.onDemandSegments = onDemandSegments; + this.pendingSegments = pendingSegments; + this.unknownSegments = unknownSegments; + } + + @JsonProperty + public SegmentLoadStatusFetcher.State getState() + { + return state; + } + + @Nullable + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public DateTime getStartTime() + { + return startTime; + } + + @JsonProperty + public long getDuration() + { + return duration; + } + + @JsonProperty + public long getTotalSegments() + { + return totalSegments; + } + + @JsonProperty + public int getUsedSegments() + { + return usedSegments; + } + + @JsonProperty + public int getPrecachedSegments() + { + return precachedSegments; + } + + @JsonProperty + public int getOnDemandSegments() + { + return onDemandSegments; + } + + @JsonProperty + public int getPendingSegments() + { + return pendingSegments; + } + + @JsonProperty + public int getUnknownSegments() + { + return unknownSegments; + } + } + + public enum State + { + /** + * Initial state after being initialised with the segment versions and before #waitForSegmentsToLoad has been called. + */ + INIT, + /** + * All segments that need to be loaded have not yet been loaded. The load status is perodically being queried from + * the broker. + */ + WAITING, + /** + * All segments which need to be loaded have been loaded, and the SegmentLoadWaiter exited successfully. + */ + SUCCESS, + /** + * An exception occurred while checking load status. The SegmentLoadWaiter exited without failing the task. + */ + FAILED, + /** + * The time spent waiting for segments to load exceeded org.apache.druid.msq.exec.SegmentLoadWaiter#TIMEOUT_DURATION_MILLIS. + * The SegmentLoadWaiter exited without failing the task. + */ + TIMED_OUT; + + public boolean isFinished() + { + return this == SUCCESS || this == FAILED || this == TIMED_OUT; + } + } + + public static class VersionLoadStatus + { + private final int usedSegments; + private final int precachedSegments; + private final int onDemandSegments; + private final int pendingSegments; + private final int unknownSegments; + + @JsonCreator + public VersionLoadStatus( + @JsonProperty("usedSegments") int usedSegments, + @JsonProperty("precachedSegments") int precachedSegments, + @JsonProperty("onDemandSegments") int onDemandSegments, + @JsonProperty("pendingSegments") int pendingSegments, + @JsonProperty("unknownSegments") int unknownSegments + ) + { + this.usedSegments = usedSegments; + this.precachedSegments = precachedSegments; + this.onDemandSegments = onDemandSegments; + this.pendingSegments = pendingSegments; + this.unknownSegments = unknownSegments; + } + + @JsonProperty + public int getUsedSegments() + { + return usedSegments; + } + + @JsonProperty + public int getPrecachedSegments() + { + return precachedSegments; + } + + @JsonProperty + public int getOnDemandSegments() + { + return onDemandSegments; + } + + @JsonProperty + public int getPendingSegments() + { + return pendingSegments; + } + + @JsonProperty + public int getUnknownSegments() + { + return unknownSegments; + } + + @JsonIgnore \ No newline at end of file From b9b4b6d567da342fca4ede15103bc6d22d26ad8f Mon Sep 17 00:00:00 2001 From: Tarun Kancherla Date: Thu, 30 Jan 2025 23:00:11 +0530 Subject: [PATCH 2/2] Clean Diff --- .../msq/exec/SegmentLoadStatusFetcher.java | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index 941a05fd2346..631a55d695e5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -16,9 +16,7 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.druid.msq.exec; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; @@ -38,7 +36,6 @@ import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; - import javax.annotation.Nullable; import javax.ws.rs.core.MediaType; import java.util.ArrayList; @@ -48,7 +45,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - /** * Class that periodically checks with the broker if all the segments generated are loaded by querying the sys table * and blocks till it is complete. This will account for and not wait for segments that would never be loaded due to @@ -85,7 +81,6 @@ public class SegmentLoadStatusFetcher implements AutoCloseable + "COUNT(*) FILTER (WHERE replication_factor = -1) AS unknownSegments\n" + "FROM sys.segments\n" + "WHERE datasource = '%s' AND is_overshadowed = 0 AND (%s)"; - private final BrokerClient brokerClient; private final ObjectMapper objectMapper; // Map of version vs latest load status. @@ -96,9 +91,7 @@ public class SegmentLoadStatusFetcher implements AutoCloseable private final boolean doWait; // since live reports fetch the value in another thread, we need to use AtomicReference private final AtomicReference status; - private final ListeningExecutorService executorService; - public SegmentLoadStatusFetcher( BrokerClient brokerClient, ObjectMapper objectMapper, @@ -130,7 +123,6 @@ public SegmentLoadStatusFetcher( Execs.singleThreaded(StringUtils.encodeForFormat(queryId) + "-segment-load-waiter-%d") ); } - /** * Uses broker client to check if all segments created by the ingestion have been loaded and updates the {@link #status)} * periodically. @@ -201,14 +193,12 @@ public void waitForSegmentsToLoad() executorService.shutdownNow(); } } - private void waitIfNeeded(long waitTimeMillis) throws Exception { if (doWait) { Thread.sleep(waitTimeMillis); } } - /** * Updates the {@link #status} with the latest details based on {@link #versionLoadStatusReference} */ @@ -230,7 +220,6 @@ private void updateStatus(State state, DateTime startTime) ) ); } - /** * Uses {@link #brokerClient} to fetch latest load status for a given set of versions. Converts the response into a * {@link VersionLoadStatus} and returns it. @@ -243,7 +232,6 @@ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception false, false, false, null, null ); String response = brokerClient.submit(sqlQuery); - if (response == null) { // Unable to query broker return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated); @@ -254,7 +242,6 @@ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception return objectMapper.readValue(response, VersionLoadStatus.class); } } - /** * Takes a list of segments and creates the condition for the broker query. Directly creates a string to avoid * computing it repeatedly. @@ -264,7 +251,6 @@ private static String createVersionCondition(Set dataSegments) // Creates a map of version to earliest and latest partition numbers created. These would be contiguous since the task // holds the lock. Map> versionsVsPartitionNumberRangeMap = new HashMap<>(); - dataSegments.forEach(segment -> { final String version = segment.getVersion(); final int partitionNum = segment.getId().getPartitionNum(); @@ -274,7 +260,6 @@ private static String createVersionCondition(Set dataSegments) )); versionsVsPartitionNumberRangeMap.computeIfAbsent(version, k -> Pair.of(partitionNum, partitionNum)); }); - // Create a condition for each version / partition List versionConditionList = new ArrayList<>(); for (Map.Entry> stringPairEntry : versionsVsPartitionNumberRangeMap.entrySet()) { @@ -285,7 +270,6 @@ private static String createVersionCondition(Set dataSegments) } return String.join(" OR ", versionConditionList); } - /** * Returns the current status of the load. */ @@ -293,7 +277,6 @@ public SegmentLoadWaiterStatus status() { return status.get(); } - @Override public void close() { @@ -304,7 +287,6 @@ public void close() log.warn(suppressed, "Error shutting down SegmentLoadStatusFetcher"); } } - public static class SegmentLoadWaiterStatus { private final State state; @@ -316,7 +298,6 @@ public static class SegmentLoadWaiterStatus private final int onDemandSegments; private final int pendingSegments; private final int unknownSegments; - @JsonCreator public SegmentLoadWaiterStatus( @JsonProperty("state") SegmentLoadStatusFetcher.State state, @@ -340,13 +321,11 @@ public SegmentLoadWaiterStatus( this.pendingSegments = pendingSegments; this.unknownSegments = unknownSegments; } - @JsonProperty public SegmentLoadStatusFetcher.State getState() { return state; } - @Nullable @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) @@ -354,50 +333,42 @@ public DateTime getStartTime() { return startTime; } - @JsonProperty public long getDuration() { return duration; } - @JsonProperty public long getTotalSegments() { return totalSegments; } - @JsonProperty public int getUsedSegments() { return usedSegments; } - @JsonProperty public int getPrecachedSegments() { return precachedSegments; } - @JsonProperty public int getOnDemandSegments() { return onDemandSegments; } - @JsonProperty public int getPendingSegments() { return pendingSegments; } - @JsonProperty public int getUnknownSegments() { return unknownSegments; } } - public enum State { /** @@ -422,13 +393,11 @@ public enum State * The SegmentLoadWaiter exited without failing the task. */ TIMED_OUT; - public boolean isFinished() { return this == SUCCESS || this == FAILED || this == TIMED_OUT; } } - public static class VersionLoadStatus { private final int usedSegments; @@ -436,7 +405,6 @@ public static class VersionLoadStatus private final int onDemandSegments; private final int pendingSegments; private final int unknownSegments; - @JsonCreator public VersionLoadStatus( @JsonProperty("usedSegments") int usedSegments, @@ -452,35 +420,29 @@ public VersionLoadStatus( this.pendingSegments = pendingSegments; this.unknownSegments = unknownSegments; } - @JsonProperty public int getUsedSegments() { return usedSegments; } - @JsonProperty public int getPrecachedSegments() { return precachedSegments; } - @JsonProperty public int getOnDemandSegments() { return onDemandSegments; } - @JsonProperty public int getPendingSegments() { return pendingSegments; } - @JsonProperty public int getUnknownSegments() { return unknownSegments; } - @JsonIgnore \ No newline at end of file