From 8ca4c2b1960e4044008f10f98a744d116e2a6421 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 3 Oct 2023 23:31:19 +0530 Subject: [PATCH 1/2] Add parameter for segment load --- docs/multi-stage-query/reference.md | 1 + .../apache/druid/msq/exec/ControllerImpl.java | 42 ++++++----- .../msq/exec/SegmentLoadStatusFetcher.java | 73 +++++++------------ .../msq/util/MultiStageQueryContext.java | 10 +++ 4 files changed, 62 insertions(+), 64 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 6236b6545258..f0ec20b46ddb 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -246,6 +246,7 @@ The following table lists the context parameters for the MSQ task engine: | `durableShuffleStorage` | SELECT, INSERT, REPLACE

Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error.

| `false` | | `faultTolerance` | SELECT, INSERT, REPLACE

Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` | | `selectDestination` | SELECT

Controls where the final result of the select query is written.
Use `taskReport`(the default) to write select results to the task report. This is not scalable since task reports size explodes for large results
Use `durableStorage` to write results to durable storage location. For large results sets, its recommended to use `durableStorage` . To configure durable storage see [`this`](#durable-storage) section. | `taskReport` | +| `segmentLoadWait` | INSERT, REPLACE

Whether the controller should wait for segments to be loaded before exiting. If this is true, the controller queries the broker and waits till the segments created (if any) have been loaded by the load rules. The controller also provides this information in the live reports and task reports. If this is false, the controller exits immediately after finishing the query. | `false` | ## Joins diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 5cdd7b0ffa6e..bbd97c0afd44 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -463,14 +463,16 @@ public TaskStatus runTask(final Closer closer) } } + boolean shouldWaitForSegmentLoad = MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context()); try { releaseTaskLocks(); cleanUpDurableStorageIfNeeded(); if (queryKernel != null && queryKernel.isSuccess()) { - if (segmentLoadWaiter != null) { - // If successful and there are segments created, segmentLoadWaiter should wait for them to become available. + if (shouldWaitForSegmentLoad && segmentLoadWaiter != null) { + // If successful, there are segments created and segment load is enabled, segmentLoadWaiter should wait + // for them to become available. segmentLoadWaiter.waitForSegmentsToLoad(); } } @@ -1363,31 +1365,35 @@ private void publishAllSegments(final Set segments) throws IOExcept } } else { Set versionsToAwait = segmentsWithTombstones.stream().map(DataSegment::getVersion).collect(Collectors.toSet()); + if (MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context())) { + segmentLoadWaiter = new SegmentLoadStatusFetcher( + context.injector().getInstance(BrokerClient.class), + context.jsonMapper(), + task.getId(), + task.getDataSource(), + versionsToAwait, + segmentsWithTombstones.size(), + true + ); + } + performSegmentPublish( + context.taskActionClient(), + SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones) + ); + } + } else if (!segments.isEmpty()) { + Set versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet()); + if (MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context())) { segmentLoadWaiter = new SegmentLoadStatusFetcher( context.injector().getInstance(BrokerClient.class), context.jsonMapper(), task.getId(), task.getDataSource(), versionsToAwait, - segmentsWithTombstones.size(), + segments.size(), true ); - performSegmentPublish( - context.taskActionClient(), - SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones) - ); } - } else if (!segments.isEmpty()) { - Set versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet()); - segmentLoadWaiter = new SegmentLoadStatusFetcher( - context.injector().getInstance(BrokerClient.class), - context.jsonMapper(), - task.getId(), - task.getDataSource(), - versionsToAwait, - segments.size(), - true - ); // Append mode. performSegmentPublish( context.taskActionClient(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index 478c632a7491..d4268fe48d1c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -41,13 +41,10 @@ import javax.annotation.Nullable; import javax.ws.rs.core.MediaType; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * Class that periodically checks with the broker if all the segments generated are loaded by querying the sys table @@ -84,14 +81,14 @@ public class SegmentLoadStatusFetcher implements AutoCloseable + "COUNT(*) FILTER (WHERE is_available = 0 AND is_published = 1 AND replication_factor != 0) AS pendingSegments,\n" + "COUNT(*) FILTER (WHERE replication_factor = -1) AS unknownSegments\n" + "FROM sys.segments\n" - + "WHERE datasource = '%s' AND is_overshadowed = 0 AND version = '%s'"; + + "WHERE datasource = '%s' AND is_overshadowed = 0 AND version in (%s)"; private final BrokerClient brokerClient; private final ObjectMapper objectMapper; // Map of version vs latest load status. - private final Map versionToLoadStatusMap; + private final AtomicReference versionLoadStatusReference; private final String datasource; - private final Set versionsToAwait; + private final String versionsInClauseString; private final int totalSegmentsGenerated; private final boolean doWait; // since live reports fetch the value in another thread, we need to use AtomicReference @@ -112,8 +109,11 @@ public SegmentLoadStatusFetcher( this.brokerClient = brokerClient; this.objectMapper = objectMapper; this.datasource = datasource; - this.versionsToAwait = new TreeSet<>(versionsToAwait); - this.versionToLoadStatusMap = new HashMap<>(); + this.versionsInClauseString = String.join( + ",", + versionsToAwait.stream().map(s -> StringUtils.format("'%s'", s)).collect(Collectors.toSet()) + ); + this.versionLoadStatusReference = new AtomicReference<>(new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated)); this.totalSegmentsGenerated = totalSegmentsGenerated; this.status = new AtomicReference<>(new SegmentLoadWaiterStatus( State.INIT, @@ -146,7 +146,7 @@ public void waitForSegmentsToLoad() try { FutureUtils.getUnchecked(executorService.submit(() -> { try { - while (!versionsToAwait.isEmpty()) { + while (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) { // Check the timeout and exit if exceeded. long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis(); if (runningMillis > TIMEOUT_DURATION_MILLIS) { @@ -159,29 +159,18 @@ public void waitForSegmentsToLoad() return; } - Iterator iterator = versionsToAwait.iterator(); - log.info( + log.debug( "Fetching segment load status for datasource[%s] from broker for segment versions[%s]", datasource, - versionsToAwait + versionsInClauseString ); - // Query the broker for all pending versions - while (iterator.hasNext()) { - String version = iterator.next(); - - // Fetch the load status for this version from the broker - VersionLoadStatus loadStatus = fetchLoadStatusForVersion(version); - versionToLoadStatusMap.put(version, loadStatus); - hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || loadStatus.getUsedSegments() > 0); - - // If loading is done for this stage, remove it from future loops. - if (hasAnySegmentBeenLoaded.get() && loadStatus.isLoadingComplete()) { - iterator.remove(); - } - } + // Fetch the load status from the broker + VersionLoadStatus loadStatus = fetchLoadStatusFromBroker(); + versionLoadStatusReference.set(loadStatus); + hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || loadStatus.getUsedSegments() > 0); - if (!versionsToAwait.isEmpty()) { + if (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) { // Update the status. updateStatus(State.WAITING, startTime); // Sleep for a bit before checking again. @@ -216,43 +205,35 @@ private void waitIfNeeded(long waitTimeMillis) throws Exception } /** - * Updates the {@link #status} with the latest details based on {@link #versionToLoadStatusMap} + * Updates the {@link #status} with the latest details based on {@link #versionLoadStatusReference} */ private void updateStatus(State state, DateTime startTime) { - int pendingSegmentCount = 0, usedSegmentsCount = 0, precachedSegmentCount = 0, onDemandSegmentCount = 0, unknownSegmentCount = 0; - for (Map.Entry entry : versionToLoadStatusMap.entrySet()) { - usedSegmentsCount += entry.getValue().getUsedSegments(); - precachedSegmentCount += entry.getValue().getPrecachedSegments(); - onDemandSegmentCount += entry.getValue().getOnDemandSegments(); - unknownSegmentCount += entry.getValue().getUnknownSegments(); - pendingSegmentCount += entry.getValue().getPendingSegments(); - } - long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis(); + VersionLoadStatus versionLoadStatus = versionLoadStatusReference.get(); status.set( new SegmentLoadWaiterStatus( state, startTime, runningMillis, totalSegmentsGenerated, - usedSegmentsCount, - precachedSegmentCount, - onDemandSegmentCount, - pendingSegmentCount, - unknownSegmentCount + versionLoadStatus.getUsedSegments(), + versionLoadStatus.getPrecachedSegments(), + versionLoadStatus.getOnDemandSegments(), + versionLoadStatus.getPendingSegments(), + versionLoadStatus.getUnknownSegments() ) ); } /** - * Uses {@link #brokerClient} to fetch latest load status for a given version. Converts the response into a + * Uses {@link #brokerClient} to fetch latest load status for a given set of versions. Converts the response into a * {@link VersionLoadStatus} and returns it. */ - private VersionLoadStatus fetchLoadStatusForVersion(String version) throws Exception + private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception { Request request = brokerClient.makeRequest(HttpMethod.POST, "/druid/v2/sql/"); - SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, version), + SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsInClauseString), ResultFormat.OBJECTLINES, false, false, false, null, null ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 265f5eae0fe1..081926ebddde 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -97,6 +97,8 @@ public class MultiStageQueryContext public static final String CTX_FAULT_TOLERANCE = "faultTolerance"; public static final boolean DEFAULT_FAULT_TOLERANCE = false; + public static final String CTX_SEGMENT_LOAD_WAIT = "segmentLoadWait"; + public static final boolean DEFAULT_SEGMENT_LOAD_WAIT = false; public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = "maxInputBytesPerWorker"; public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode"; @@ -148,6 +150,14 @@ public static boolean isFaultToleranceEnabled(final QueryContext queryContext) ); } + public static boolean shouldWaitForSegmentLoad(final QueryContext queryContext) + { + return queryContext.getBoolean( + CTX_SEGMENT_LOAD_WAIT, + DEFAULT_SEGMENT_LOAD_WAIT + ); + } + public static boolean isReindex(final QueryContext queryContext) { return queryContext.getBoolean( From e910f0b3fad582c2bc85b0cd5b9fd8d2c303dd2a Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 5 Oct 2023 13:09:05 +0530 Subject: [PATCH 2/2] Address review comments --- docs/multi-stage-query/reference.md | 2 +- .../apache/druid/msq/exec/ControllerImpl.java | 2 ++ .../msq/exec/SegmentLoadStatusFetcher.java | 19 +++++++++++++------ .../msq/util/MultiStageQueryContext.java | 2 +- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index f0ec20b46ddb..010bbff2a270 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -246,7 +246,7 @@ The following table lists the context parameters for the MSQ task engine: | `durableShuffleStorage` | SELECT, INSERT, REPLACE

Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error.

| `false` | | `faultTolerance` | SELECT, INSERT, REPLACE

Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` | | `selectDestination` | SELECT

Controls where the final result of the select query is written.
Use `taskReport`(the default) to write select results to the task report. This is not scalable since task reports size explodes for large results
Use `durableStorage` to write results to durable storage location. For large results sets, its recommended to use `durableStorage` . To configure durable storage see [`this`](#durable-storage) section. | `taskReport` | -| `segmentLoadWait` | INSERT, REPLACE

Whether the controller should wait for segments to be loaded before exiting. If this is true, the controller queries the broker and waits till the segments created (if any) have been loaded by the load rules. The controller also provides this information in the live reports and task reports. If this is false, the controller exits immediately after finishing the query. | `false` | +| `waitTillSegmentsLoad` | INSERT, REPLACE

If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` | ## Joins diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index bbd97c0afd44..ddb85821c79a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -473,6 +473,8 @@ public TaskStatus runTask(final Closer closer) if (shouldWaitForSegmentLoad && segmentLoadWaiter != null) { // If successful, there are segments created and segment load is enabled, segmentLoadWaiter should wait // for them to become available. + log.info("Controller will now wait for segments to be loaded. The query has already finished executing," + + " and results will be included once the segments are loaded, even if this query is cancelled now."); segmentLoadWaiter.waitForSegmentsToLoad(); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index d4268fe48d1c..17f46bad23a2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -145,6 +145,7 @@ public void waitForSegmentsToLoad() final AtomicReference hasAnySegmentBeenLoaded = new AtomicReference<>(false); try { FutureUtils.getUnchecked(executorService.submit(() -> { + long lastLogMillis = -TimeUnit.MINUTES.toMillis(1); try { while (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) { // Check the timeout and exit if exceeded. @@ -159,11 +160,14 @@ public void waitForSegmentsToLoad() return; } - log.debug( - "Fetching segment load status for datasource[%s] from broker for segment versions[%s]", - datasource, - versionsInClauseString - ); + if (runningMillis - lastLogMillis >= TimeUnit.MINUTES.toMillis(1)) { + lastLogMillis = runningMillis; + log.info( + "Fetching segment load status for datasource[%s] from broker for segment versions[%s]", + datasource, + versionsInClauseString + ); + } // Fetch the load status from the broker VersionLoadStatus loadStatus = fetchLoadStatusFromBroker(); @@ -240,7 +244,10 @@ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception request.setContent(MediaType.APPLICATION_JSON, objectMapper.writeValueAsBytes(sqlQuery)); String response = brokerClient.sendQuery(request); - if (response.trim().isEmpty()) { + if (response == null) { + // Unable to query broker + return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated); + } else if (response.trim().isEmpty()) { // If no segments are returned for a version, all segments have been dropped by a drop rule. return new VersionLoadStatus(0, 0, 0, 0, 0); } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 081926ebddde..98dcd471d0fe 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -97,7 +97,7 @@ public class MultiStageQueryContext public static final String CTX_FAULT_TOLERANCE = "faultTolerance"; public static final boolean DEFAULT_FAULT_TOLERANCE = false; - public static final String CTX_SEGMENT_LOAD_WAIT = "segmentLoadWait"; + public static final String CTX_SEGMENT_LOAD_WAIT = "waitTillSegmentsLoad"; public static final boolean DEFAULT_SEGMENT_LOAD_WAIT = false; public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = "maxInputBytesPerWorker";