Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
41bb660
Initial commit.
cryptoe Nov 7, 2022
4957b01
Merge things
cryptoe Nov 7, 2022
1930b3b
Fixing error message in retry exceeded exception
cryptoe Nov 7, 2022
1ed9346
Cleaning up some code
cryptoe Nov 9, 2022
14d8c60
Adding some test cases.
cryptoe Nov 10, 2022
f8943d3
Adding java docs.
cryptoe Nov 11, 2022
74fa66f
Merge things
cryptoe Nov 11, 2022
c92b217
Finishing up state test cases.
cryptoe Nov 13, 2022
58da3cf
Adding some more java docs and fixing spot bugs, intellij inspections
cryptoe Nov 13, 2022
7b90edc
Fixing intellij inspections and added tests
cryptoe Nov 14, 2022
111c599
Merge things
cryptoe Nov 14, 2022
deafdf8
Documenting error codes
cryptoe Nov 15, 2022
edfd5b6
Migrate current integration batch tests to equivalent MSQ tests (#13374)
abhagraw Nov 21, 2022
4d1a3b1
Adding ITTest which kills the worker abruptly
cryptoe Nov 28, 2022
ecb1942
Review comments phase one
cryptoe Nov 29, 2022
48c844a
Adding doc changes
cryptoe Nov 29, 2022
ac41754
Adjusting for single threaded execution.
cryptoe Nov 29, 2022
29b0c6e
Merge remote-tracking branch 'apache/master' into controller_state_retry
cryptoe Nov 29, 2022
7c31923
Adding Sequential Merge PR state handling
cryptoe Dec 8, 2022
2000965
Merge remote-tracking branch 'apache/master' into controller_state_retry
cryptoe Dec 8, 2022
d32397a
Merge things
cryptoe Dec 8, 2022
a286480
Fixing checkstyle.
cryptoe Dec 8, 2022
bf25675
Adding new context param for fault tolerance.
cryptoe Dec 9, 2022
a78ee50
Merge remote-tracking branch 'apache/master' into controller_state_retry
cryptoe Dec 9, 2022
e674cac
Merge things
cryptoe Dec 9, 2022
9e0b455
Merge remote-tracking branch 'apache/master' into controller_state_retry
Jan 5, 2023
0c22ce8
Merge things
Jan 9, 2023
a4f974c
Adding parameterized tests
cryptoe Jan 9, 2023
87d39dc
Adding missed files
cryptoe Jan 9, 2023
2de05f8
Review comments and fixing tests.
cryptoe Jan 9, 2023
85843de
Documentation things.
cryptoe Jan 9, 2023
791e2cb
Fixing IT
cryptoe Jan 10, 2023
820b5b7
Controller impl fix.
cryptoe Jan 10, 2023
dd9a6a2
Fixing racy WorkerSketchFetcherTest.java exception handling.
cryptoe Jan 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,16 @@ jobs:
env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: ./it.sh travis MultiStageQuery


- &integration_tests_ex
name: "(Compile=openjdk8, Run=openjdk8) multi stage query tests with MM"
stage: Tests - phase 2
jdk: openjdk8
services: *integration_test_services
env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: ./it.sh travis MultiStageQueryWithMM


- &integration_tests_ex
name: "(Compile=openjdk8, Run=openjdk8) catalog integration tests"
stage: Tests - phase 2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.common.function;

import java.util.Objects;

/**
* Based on {@link java.util.function.BiConsumer}
*/
@FunctionalInterface
public interface TriConsumer<T, U, V>
{
/**
* Performs this operation on the given arguments.
*
* @param t the first input argument
* @param u the second input argument
* @param v the third input argument
*/
void accept(T t, U u, V v);

/**
* Returns a composed {@code TriConsumer} that performs, in sequence, this
* operation followed by the {@code after} operation. If performing either
* operation throws an exception, it is relayed to the caller of the
* composed operation. If performing this operation throws an exception,
* the {@code after} operation will not be performed.
*
* @param after the operation to perform after this operation
* @return a composed {@code TriConsumer} that performs in sequence this
* operation followed by the {@code after} operation
* @throws NullPointerException if {@code after} is null
*/
default TriConsumer<T, U, V> andThen(TriConsumer<? super T, ? super U, ? super V> after)
{
Objects.requireNonNull(after);

return (t, u, v) -> {
accept(t, u, v);
after.accept(t, u, v);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.common.function;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashSet;
import java.util.Set;

public class TriConsumerTest
{

@Test
public void sanityTest()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a test where the consumerA throws an exception after adding to the set, and then we should see if consumerB runs or not.

{
Set<Integer> sumSet = new HashSet<>();
TriConsumer<Integer, Integer, Integer> consumerA = (arg1, arg2, arg3) -> {
sumSet.add(arg1 + arg2 + arg3);
};
TriConsumer<Integer, Integer, Integer> consumerB = (arg1, arg2, arg3) -> {
sumSet.remove(arg1 + arg2 + arg3);
};
consumerA.andThen(consumerB).accept(1, 2, 3);

Assert.assertTrue(sumSet.isEmpty());
}
}
2 changes: 1 addition & 1 deletion docs/multi-stage-query/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ sidebar_label: Known issues

## Multi-stage query task runtime

- Fault tolerance is not implemented. If any task fails, the entire query fails.
- Fault tolerance is partially implemented. Workers get relaunched when they are killed unexpectedly. The controller does not get relaunched if it is killed unexpectedly.

- Worker task stage outputs are stored in the working directory given by `druid.indexer.task.baseDir`. Stages that
generate a large amount of output data may exhaust all available disk space. In this case, the query fails with
Expand Down
16 changes: 10 additions & 6 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ The following table lists the context parameters for the MSQ task engine:
| `rowsPerSegment` | INSERT or REPLACE<br /><br />The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 |
| `indexSpec` | INSERT or REPLACE<br /><br />An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
| `clusterStatisticsMergeMode` | Whether to use parallel or sequential mode for merging of the worker sketches. Can be `PARALLEL`, `SEQUENTIAL` or `AUTO`. See [Sketch Merging Mode](#sketch-merging-mode) for more information. | `PARALLEL` |
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error. <br /><br /> | `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |

## Sketch Merging Mode
This section details the advantages and performance of various Cluster By Statistics Merge Modes.
Expand All @@ -332,17 +334,16 @@ reading rows from the datasource. These statistics must be transferred to the co

`PARALLEL` mode fetches the key statistics for all time chunks from all workers together and the controller then downsamples
the sketch if it does not fit in memory. This is faster than `SEQUENTIAL` mode as there is less over head in fetching sketches
for all time chunks together. This is good for small sketches which won't be downsampled even if merged together or if
for all time chunks together. This is good for small sketches which won't be down sampled even if merged together or if
accuracy in segment sizing for the ingestion is not very important.

`SEQUENTIAL` mode fetches the sketches in ascending order of time and generates the partition boundaries for one time
chunk at a time. This gives more working memory to the controller for merging sketches, which results in less
downsampling and thus, more accuracy. There is, however, a time overhead on fetching sketches in sequential order. This is
down sampling and thus, more accuracy. There is, however, a time overhead on fetching sketches in sequential order. This is
good for cases where accuracy is important.

`AUTO` mode tries to find the best approach based on number of workers and size of input rows. If there are more
than 100 workers or if the combined sketch size among all workers is more than 1GB, `SEQUENTIAL` is chosen, otherwise,
`PARALLEL` is chosen.
`AUTO` mode tries to find the best approach based on number of workers. If there are more
than 100 workers, `SEQUENTIAL` is chosen, otherwise, `PARALLEL` is chosen.

## Durable Storage

Expand Down Expand Up @@ -376,7 +377,8 @@ The following table lists query limits:
| Number of cluster by columns that can appear in a stage | 1,500 | [`TooManyClusteredByColumns`](#error_TooManyClusteredByColumns) |
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | [`TooManyWorkers`](#error_TooManyWorkers) |
| Maximum memory occupied by broadcasted tables. | 30% of each [processor memory bundle](concepts.md#memory-usage). | [`BroadcastTablesTooLarge`](#error_BroadcastTablesTooLarge) |

| Maximum relaunch attempts per worker. Initial run is not a relaunch. The worker will be spawned 1 + `workerRelaunchLimit` times before the job fails. | 2 | `TooManyAttemptsForWorker` |
| Maximum relaunch attempts for a job across all workers. | 100 | `TooManyAttemptsForJob` |
Comment thread
cryptoe marked this conversation as resolved.
<a name="errors"></a>

## Error codes
Expand All @@ -401,6 +403,8 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_QueryNotSupported">`QueryNotSupported`</a> | QueryKit could not translate the provided native query to a multi-stage query.<br /> <br />This can happen if the query uses features that aren't supported, like GROUPING SETS. | |
| <a name="error_RowTooLarge">`RowTooLarge`</a> | The query tried to process a row that was too large to write to a single frame. See the [Limits](#limits) table for specific limits on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | `maxFrameSize`: The limit on the frame size. |
| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch all the worker tasks in time. <br /> <br />There might be insufficient available slots to start all the worker tasks simultaneously.<br /> <br /> Try splitting up the query into smaller chunks with lesser `maxNumTasks` number. Another option is to increase capacity. | `numTasks`: The number of tasks attempted to launch. |
| <a name="error_TooManyAttemptsForJob">`TooManyAttemptsForJob`</a> | Total relaunch attempt count across all workers exceeded max relaunch attempt limit. See the [Limits](#limits) table for the specific limit. | `maxRelaunchCount`: Max number of relaunches across all the workers defined in the [Limits](#limits) section. <br /><br /> `currentRelaunchCount`: current relaunch counter for the job across all workers. <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: Error message of the latest failed task.|
| <a name="error_TooManyAttemptsForWorker">`TooManyAttemptsForWorker`</a> | Worker exceeded maximum relaunch attempt count as defined in the [Limits](#limits) section. |`maxPerWorkerRelaunchCount`: Max number of relaunches allowed per worker as defined in the [Limits](#limits) section. <br /><br /> `workerNumber`: the worker number for which the task failed <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: Error message of the latest failed task.|
| <a name="error_TooManyBuckets">`TooManyBuckets`</a> | Exceeded the maximum number of partition buckets for a stage (5,000 partition buckets).<br />< br />Partition buckets are created for each [`PARTITIONED BY`](#partitioned-by) time chunk for INSERT and REPLACE queries. The most common reason for this error is that your `PARTITIONED BY` is too narrow relative to your data. | `maxBuckets`: The limit on partition buckets. |
| <a name="error_TooManyInputFiles">`TooManyInputFiles`</a> | Exceeded the maximum number of input files or segments per worker (10,000 files or segments).<br /><br />If you encounter this limit, consider adding more workers, or breaking up your query into smaller queries that process fewer files or segments per query. | `numInputFiles`: The total number of input files/segments for the stage.<br /><br />`maxInputFiles`: The maximum number of input files/segments per worker per stage.<br /><br />`minNumWorker`: The minimum number of workers required for a successful run. |
| <a name="error_TooManyPartitions">`TooManyPartitions`</a> | Exceeded the maximum number of partitions for a stage (25,000 partitions).<br /><br />This can occur with INSERT or REPLACE statements that generate large numbers of segments, since each segment is associated with a partition. If you encounter this limit, consider breaking up your INSERT or REPLACE statement into smaller statements that process less data per statement. | `maxPartitions`: The limit on partitions which was exceeded |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public String getId()
/**
* Periodic update of {@link CounterSnapshots} from subtasks.
*/
void updateCounters(CounterSnapshotsTree snapshotsTree);
void updateCounters(String taskId, CounterSnapshotsTree snapshotsTree);

/**
* Reports that results are ready for a subtask.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void postPartialKeyStatistics(
* Client-side method to update the controller with counters for a particular stage and worker. The controller uses
* this to compile live reports, track warnings generated etc.
*/
void postCounters(CounterSnapshotsTree snapshotsTree) throws IOException;
void postCounters(String workerId, CounterSnapshotsTree snapshotsTree) throws IOException;

/**
* Client side method to update the controller with the result object for a particular stage and worker. This also
Expand Down
Loading