diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java
index 49a698b42ae7..9c65ce445b1b 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java
@@ -28,6 +28,7 @@
import javax.inject.Inject;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Reserves the {@link GroupByQueryResources} for a given group by query and maps them to the query's resource ID.
@@ -67,21 +68,25 @@
* nested ones execute via an unoptimized way.
* 3. There's some knowledge to the mergeResults that the query runner passed to it is the one created by the corresponding toolchest's
* mergeRunners (which is the typical use case). This is encoded in the argument {@code willMergeRunner}, and is to be set by the callers.
- * The only production use case where this isn't true is when the broker is merging the results gathered from the historical)
+ * The only production use case where this isn't true is when the broker is merging the results gathered from the historical
*
* TESTING
* Unit tests mimic the broker-historical interaction in many places, which can lead to the code not working as intended because the assumptions don't hold.
* In many test cases, there are two nested mergeResults calls, the outer call mimics what the broker does, while the inner one mimics what the historical does,
* and the assumption (1) fails. Therefore, the testing code should assign a unique resource id b/w each mergeResults call, and also make sure that the top level mergeResults
* would have willMergeRunner = false, since it's being called on top of a mergeResults's runner, while the inner one would have willMergeRunner = true because its being
- * called on actual runners (as it happens in the brokers, and the historicals)
+ * called on actual runners (as it happens in the brokers, and the historicals).
+ *
+ * There is a test in GroupByResourcesReservationPoolTest that checks for deadlocks when the operations are interleaved in a
+ * certain maanner. It is ignored because it sleeps and can increase time when the test suite is run. Developers making any changes
+ * to this class, or a related class should manually verify that all the tests in the test class are running as expected.
*/
public class GroupByResourcesReservationPool
{
/**
* Map of query's resource id -> group by resources reserved for the query to execute
*/
- final ConcurrentHashMap pool = new ConcurrentHashMap<>();
+ final ConcurrentHashMap> pool = new ConcurrentHashMap<>();
/**
* Buffer pool from where the merge buffers are picked and reserved
@@ -104,19 +109,42 @@ public GroupByResourcesReservationPool(
}
/**
- * Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map
+ * Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map.
+ * This is a blocking call, and can block up to the given query's timeout
*/
public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner)
{
if (queryResourceId == null) {
throw DruidException.defensive("Query resource id must be populated");
}
- pool.compute(queryResourceId, (id, existingResource) -> {
- if (existingResource != null) {
- throw DruidException.defensive("Resource with the given identifier [%s] is already present", id);
- }
- return GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig);
- });
+
+ // First check if the query resource id is present in the map, and if not, populate a dummy reference. This will
+ // block other threads from populating the map with the same query id, and is essentially same as reserving a spot in
+ // the map for the given query id. Since the actual allocation of the resource might take longer than expected, we
+ // do it out of the critical section, once we have "reserved" the spot
+ AtomicReference reference = new AtomicReference<>(null);
+ AtomicReference existingResource = pool.putIfAbsent(queryResourceId, reference);
+
+ // Multiple attempts made to allocate the query resource for a given resource id. Throw an exception
+ //noinspection VariableNotUsedInsideIf
+ if (existingResource != null) {
+ throw DruidException.defensive("Resource with the given identifier [%s] is already present", queryResourceId);
+ }
+
+ GroupByQueryResources resources;
+ try {
+ // We have reserved a spot in the map. Now begin the blocking call.
+ resources = GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig);
+ }
+ catch (Throwable t) {
+ // Unable to allocate the resources, perform cleanup and rethrow the exception
+ pool.remove(queryResourceId);
+ throw t;
+ }
+
+ // Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the
+ // allocated resources from it
+ reference.compareAndSet(null, resources);
}
/**
@@ -125,7 +153,19 @@ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery,
@Nullable
public GroupByQueryResources fetch(QueryResourceId queryResourceId)
{
- return pool.get(queryResourceId);
+ AtomicReference resourcesReference = pool.get(queryResourceId);
+ if (resourcesReference == null) {
+ // There weren't any resources allocated corresponding to the provided resource id
+ return null;
+ }
+ GroupByQueryResources resource = resourcesReference.get();
+ if (resource == null) {
+ throw DruidException.defensive(
+ "Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found",
+ queryResourceId
+ );
+ }
+ return resource;
}
/**
@@ -133,9 +173,17 @@ public GroupByQueryResources fetch(QueryResourceId queryResourceId)
*/
public void clean(QueryResourceId queryResourceId)
{
- GroupByQueryResources resources = pool.remove(queryResourceId);
- if (resources != null) {
- resources.close();
+ AtomicReference resourcesReference = pool.remove(queryResourceId);
+ if (resourcesReference != null) {
+ GroupByQueryResources resource = resourcesReference.get();
+ // Reference should refer to a non-empty resource
+ if (resource == null) {
+ throw DruidException.defensive(
+ "Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found",
+ queryResourceId
+ );
+ }
+ resource.close();
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
index 5fa26d3fa2b4..6451fb9b943d 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
@@ -142,6 +142,8 @@ public GroupingEngine(
* {@link GroupByMergingQueryRunner} for a particular query. The resources are to be acquired once throughout the
* execution of the query, or need to be re-acquired (if needed). Users must ensure that throughout the execution,
* a query already holding the resources shouldn't request for more resources, because that can cause deadlocks.
+ *
+ * This method throws an exception if it is not able to allocate sufficient resources required for the query to succeed
*/
public static GroupByQueryResources prepareResource(
GroupByQuery query,
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java
new file mode 100644
index 000000000000..8d0f2d9e37dc
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.DefaultBlockingPool;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.QueryResourceId;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+public class GroupByResourcesReservationPoolTest
+{
+
+ /**
+ * CONFIG + QUERY require exactly 1 merge buffer to succeed if 'willMergeRunners' is true while allocating the resources
+ */
+ private static final GroupByQueryConfig CONFIG = new GroupByQueryConfig();
+ private static final GroupByQuery QUERY = GroupByQuery.builder()
+ .setInterval(Intervals.ETERNITY)
+ .setDataSource("foo")
+ .setDimensions(
+ ImmutableList.of(
+ new DefaultDimensionSpec("dim2", "_d0")
+ )
+ )
+ .setGranularity(Granularities.ALL)
+ .setContext(
+ ImmutableMap.of("timeout", 0)
+ ) // Query can block indefinitely
+ .build();
+
+ /**
+ * This test confirms that the interleaved GroupByResourcesReservationPool.reserve() and GroupByResourcesReservationPool.clean()
+ * between multiple threads succeed. It is specifically designed to test the case when the operations are interleaved in the
+ * following manner:
+ *
+ * THREAD1 THREAD2
+ * pool.reserve(query1)
+ * pool.reserve(query2)
+ * pool.clean(query1)
+ *
+ * This test assumes a few things about the implementation of the interfaces, which are laid out in the comments.
+ *
+ * The test should complete under 10 seconds, and the majority of the time would be consumed by waiting for the thread
+ * that sleeps for 5 seconds
+ */
+ @Ignore(
+ "Isn't run as a part of CI since it sleeps for 5 seconds. Callers must run the test manually if any changes are made "
+ + "to the corresponding class"
+ )
+ @Test(timeout = 100_000L)
+ public void testInterleavedReserveAndRemove()
+ {
+ ExecutorService executor = Execs.multiThreaded(3, "group-by-resources-reservation-pool-test-%d");
+
+ // Sanity checks that the query will acquire exactly one merge buffer. This safeguards the test being useful in
+ // case the merge buffer acquisition code changes to acquire less than one merge buffer (the test would be
+ // useless in that case) or more than one merge buffer (the test would incorrectly fail in that case)
+ Assert.assertEquals(
+ 1,
+ GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(CONFIG, QUERY)
+ + GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(QUERY)
+ );
+
+ // Blocking pool with a single buffer, which means only one of the queries can succeed at a time
+ BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1);
+ GroupByResourcesReservationPool groupByResourcesReservationPool =
+ new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
+
+ // Latch indicating that the first thread has called reservationPool.reserve()
+ CountDownLatch reserveCalledByFirstThread = new CountDownLatch(1);
+ // Latch indicating that the second thread has called reservationPool.reserve()
+ CountDownLatch reserveCalledBySecondThread = new CountDownLatch(1);
+ // Latch indicating that all the threads have been completed successfully. Main thread waits on this latch before exiting
+ CountDownLatch threadsCompleted = new CountDownLatch(2);
+
+ // THREAD 1
+ executor.submit(() -> {
+
+ QueryResourceId queryResourceId1 = new QueryResourceId("test-id-1")
+ {
+ @Override
+ public int hashCode()
+ {
+ // IMPORTANT ASSUMPTION: For the test to be useful, it assumes that under the hood we are using a
+ // ConcurrentHashMap (or a concurrent map with similar implementation) that
+ // implements granular locking of the nodes
+ // The hashCode of the queryResourceId used in Thread1 and Thread2 is the same. Therefore, both the queryIds
+ // would be guarded by the same lock
+ return 10;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return super.equals(o);
+ }
+ };
+ groupByResourcesReservationPool.reserve(queryResourceId1, QUERY, true);
+ reserveCalledByFirstThread.countDown();
+ try {
+ reserveCalledBySecondThread.await();
+ }
+ catch (InterruptedException e) {
+ Assert.fail("Interrupted while waiting for second reserve call to be made");
+ }
+ groupByResourcesReservationPool.clean(queryResourceId1);
+ threadsCompleted.countDown();
+ });
+
+ // THREAD 2
+ executor.submit(() -> {
+ try {
+ reserveCalledByFirstThread.await();
+ }
+ catch (InterruptedException e) {
+ Assert.fail("Interrupted while waiting for first reserve call to be made");
+ }
+
+ QueryResourceId queryResourceId2 = new QueryResourceId("test-id-2")
+ {
+ @Override
+ public int hashCode()
+ {
+ return 10;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return super.equals(o);
+ }
+ };
+
+ // Since the reserve() call is blocking, we need to execute it separately, so that we can count down the latch
+ // and inform Thread1 the reserve call has been made by this thread
+ executor.submit(() -> {
+ groupByResourcesReservationPool.reserve(queryResourceId2, QUERY, true);
+ threadsCompleted.countDown();
+ });
+ try {
+ // This sleep call "ensures" that the statment pool.reserve(queryResourceId2) is called before we release the
+ // latch (that will cause Thread1 to release the acquired resources). It still doesn't guarantee the previous
+ // statement, however that's the best we can do, given that reserve() is blocking
+ Thread.sleep(5_000);
+ }
+ catch (InterruptedException e) {
+ Assert.fail("Interrupted while sleeping");
+ }
+ reserveCalledBySecondThread.countDown();
+ });
+
+ try {
+ threadsCompleted.await();
+ }
+ catch (InterruptedException e) {
+ Assert.fail("Interrupted while waiting for the threads to complete");
+ }
+ }
+
+ @Test
+ public void testMultipleSimultaneousAllocationAttemptsFail()
+ {
+ BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1);
+ GroupByResourcesReservationPool groupByResourcesReservationPool =
+ new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
+ QueryResourceId queryResourceId = new QueryResourceId("test-id");
+
+ groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
+
+ Assert.assertThrows(
+ DruidException.class,
+ () -> groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true)
+ );
+ }
+
+ @Test
+ public void testMultipleSequentialAllocationAttemptsSucceed()
+ {
+ BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1);
+ GroupByResourcesReservationPool groupByResourcesReservationPool =
+ new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
+ QueryResourceId queryResourceId = new QueryResourceId("test-id");
+
+ groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
+ GroupByQueryResources oldResources = groupByResourcesReservationPool.fetch(queryResourceId);
+
+ // Cleanup the resources
+ groupByResourcesReservationPool.clean(queryResourceId);
+
+ // Repeat the calls
+ groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
+ GroupByQueryResources newResources = groupByResourcesReservationPool.fetch(queryResourceId);
+ Assert.assertNotNull(newResources);
+
+ Assert.assertNotSame(oldResources, newResources);
+ }
+}