From d56bbee3eaa58aea5fb45f1478c3da1f8e23ff46 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 2 May 2024 18:30:19 +0530 Subject: [PATCH 1/7] fix deadlock --- .../GroupByResourcesReservationPool.java | 8 +- .../GroupByResourcesReservationPoolTest.java | 174 ++++++++++++++++++ 2 files changed, 180 insertions(+), 2 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java index 49a698b42ae7..ca9c7bbca807 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java @@ -104,18 +104,22 @@ public GroupByResourcesReservationPool( } /** - * Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map + * Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map. */ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner) { if (queryResourceId == null) { throw DruidException.defensive("Query resource id must be populated"); } + GroupByQueryResources resources = + GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig); + pool.compute(queryResourceId, (id, existingResource) -> { if (existingResource != null) { + resources.close(); throw DruidException.defensive("Resource with the given identifier [%s] is already present", id); } - return GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig); + return resources; }); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java new file mode 100644 index 000000000000..e02271e57481 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.QueryResourceId; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +public class GroupByResourcesReservationPoolTest +{ + + /** + * This test confirms that the interleaved GroupByResourcesReservationPool.reserve() and GroupByResourcesReservationPool.clean() + * between multiple threads succeed. It is specifically designed to test the case when the operations are interleaved in the + * following manner: + *

+ * THREAD1 THREAD2 + * pool.reserve(query1) + * pool.reserve(query2) + * pool.clean(query1) + *

+ * This test assumes a few things about the implementation of the interfaces, which are laid out in the comments. + *

+ * The test should complete under 10 seconds, and the majority of the time would be consumed by waiting for the thread + * that sleeps for 5 seconds + */ + @Test(timeout = 100_000L) + public void testInterleavedReserveAndRemove() + { + ExecutorService executor = Execs.multiThreaded(3, "group-by-resources-reservation-pool-test-%d"); + + // Group by query + query configuration that would try to acquire exactly 1 merge buffer + GroupByQueryConfig config = new GroupByQueryConfig(); + GroupByQuery query = GroupByQuery.builder() + .setInterval(Intervals.ETERNITY) + .setDataSource("foo") + .setDimensions(ImmutableList.of(new DefaultDimensionSpec("dim2", "_d0"))) + .setGranularity(Granularities.ALL) + .setContext(ImmutableMap.of("timeout", 0)) // Query can block indefinitely + .build(); + + + // Blocking pool with a single buffer, which means only one of the queries can succeed at a time + BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1); + GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(mergeBufferPool, config); + + // Latch indicating that the first thread has called reservationPool.reserve() + CountDownLatch reserveCalledByFirstThread = new CountDownLatch(1); + // Latch indicating that the second thread has called reservationPool.reserve() + CountDownLatch reserveCalledBySecondThread = new CountDownLatch(1); + // Latch indicating that all the threads have been completed successfully. Main thread waits on this latch before exiting + CountDownLatch threadsCompleted = new CountDownLatch(2); + + // THREAD 1 + executor.submit(() -> { + // Sanity checks that the query will acquire exactly one merge buffer. This safeguards the test being useful in + // case the merge buffer acquisition code changes to acquire less than one merge buffer (the test would be + // useless in that case) or more than one merge buffer (the test would incorrectly fail in that case) + assert (GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(config, query) + + GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(query) == 1); + + QueryResourceId queryResourceId1 = new QueryResourceId("test-id-1") + { + @Override + public int hashCode() + { + // IMPORTANT ASSUMPTION: For the test to be useful, it assumes that under the hood we are using a + // ConcurrentHashMap (or a concurrent map with similar implementation) that + // implements granular locking of the nodes + // The hashCode of the queryResourceId used in Thread1 and Thread2 is the same. Therefore, both the queryIds + // would be guarded by the same lock + return 10; + } + + @Override + public boolean equals(Object o) + { + return super.equals(o); + } + }; + groupByResourcesReservationPool.reserve(queryResourceId1, query, true); + reserveCalledByFirstThread.countDown(); + try { + reserveCalledBySecondThread.await(); + } + catch (InterruptedException e) { + Assert.fail("Interrupted while waiting for second reserve call to be made"); + } + groupByResourcesReservationPool.clean(queryResourceId1); + threadsCompleted.countDown(); + }); + + // THREAD 2 + executor.submit(() -> { + assert (GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(config, query) + + GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(query) == 1); + try { + reserveCalledByFirstThread.await(); + } + catch (InterruptedException e) { + Assert.fail("Interrupted while waiting for first reserve call to be made"); + } + + QueryResourceId queryResourceId2 = new QueryResourceId("test-id-2") + { + @Override + public int hashCode() + { + return 10; + } + + @Override + public boolean equals(Object o) + { + return super.equals(o); + } + }; + + // Since the reserve() call is blocking, we need to execute it separately, so that we can countdown the latch + // and inform Thread1 the reserve call has been made by this thread + executor.submit(() -> { + groupByResourcesReservationPool.reserve(queryResourceId2, query, true); + threadsCompleted.countDown(); + }); + try { + // This sleep call "ensures" that the statment pool.reserve(queryResourceId2) is called before we release the + // latch (that will cause Thread1 to release the acquired resources). It still doesn't guarantee the previous + // statement, however that's the best we can do, given that reserve() is blocking + Thread.sleep(5_000); + } + catch (InterruptedException e) { + Assert.fail("Interrupted while sleeping"); + } + reserveCalledBySecondThread.countDown(); + }); + + try { + threadsCompleted.await(); + } + catch (InterruptedException e) { + Assert.fail("Interrupted while waiting for the threads to complete"); + } + } +} From ecd27408617443b6f58cde5b8d1b8f188b259698 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 2 May 2024 20:02:35 +0530 Subject: [PATCH 2/7] static check --- .../groupby/GroupByResourcesReservationPool.java | 1 + .../GroupByResourcesReservationPoolTest.java | 16 ++++++++-------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java index ca9c7bbca807..cdcd0db38d06 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java @@ -105,6 +105,7 @@ public GroupByResourcesReservationPool( /** * Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map. + * This is a blocking call, and can block upto the given query's timeout */ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java index e02271e57481..7bf1b002e165 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java @@ -67,7 +67,14 @@ public void testInterleavedReserveAndRemove() .setGranularity(Granularities.ALL) .setContext(ImmutableMap.of("timeout", 0)) // Query can block indefinitely .build(); - + // Sanity checks that the query will acquire exactly one merge buffer. This safeguards the test being useful in + // case the merge buffer acquisition code changes to acquire less than one merge buffer (the test would be + // useless in that case) or more than one merge buffer (the test would incorrectly fail in that case) + Assert.assertTrue( + GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(config, query) + + GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(query) + == 1 + ); // Blocking pool with a single buffer, which means only one of the queries can succeed at a time BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1); @@ -83,11 +90,6 @@ public void testInterleavedReserveAndRemove() // THREAD 1 executor.submit(() -> { - // Sanity checks that the query will acquire exactly one merge buffer. This safeguards the test being useful in - // case the merge buffer acquisition code changes to acquire less than one merge buffer (the test would be - // useless in that case) or more than one merge buffer (the test would incorrectly fail in that case) - assert (GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(config, query) - + GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(query) == 1); QueryResourceId queryResourceId1 = new QueryResourceId("test-id-1") { @@ -122,8 +124,6 @@ public boolean equals(Object o) // THREAD 2 executor.submit(() -> { - assert (GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(config, query) - + GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(query) == 1); try { reserveCalledByFirstThread.await(); } From d6713b63b21f6ef6cf3820a0fa21c0bbd821ac4a Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 3 May 2024 11:04:55 +0530 Subject: [PATCH 3/7] changes --- .../GroupByResourcesReservationPool.java | 51 +++++++++----- .../druid/query/groupby/GroupingEngine.java | 2 + .../GroupByResourcesReservationPoolTest.java | 66 ++++++++++++++----- 3 files changed, 87 insertions(+), 32 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java index cdcd0db38d06..3fe9379ba42e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java @@ -28,6 +28,7 @@ import javax.inject.Inject; import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; /** * Reserves the {@link GroupByQueryResources} for a given group by query and maps them to the query's resource ID. @@ -67,21 +68,25 @@ * nested ones execute via an unoptimized way. * 3. There's some knowledge to the mergeResults that the query runner passed to it is the one created by the corresponding toolchest's * mergeRunners (which is the typical use case). This is encoded in the argument {@code willMergeRunner}, and is to be set by the callers. - * The only production use case where this isn't true is when the broker is merging the results gathered from the historical) + * The only production use case where this isn't true is when the broker is merging the results gathered from the historical *

* TESTING * Unit tests mimic the broker-historical interaction in many places, which can lead to the code not working as intended because the assumptions don't hold. * In many test cases, there are two nested mergeResults calls, the outer call mimics what the broker does, while the inner one mimics what the historical does, * and the assumption (1) fails. Therefore, the testing code should assign a unique resource id b/w each mergeResults call, and also make sure that the top level mergeResults * would have willMergeRunner = false, since it's being called on top of a mergeResults's runner, while the inner one would have willMergeRunner = true because its being - * called on actual runners (as it happens in the brokers, and the historicals) + * called on actual runners (as it happens in the brokers, and the historicals). + *

+ * There is a test in GroupByResourcesReservationPoolTest that checks for deadlocks when the operations are interleaved in a + * certain maanner. It is ignored because it sleeps and can increase time when the test suite is run. Developers making any changes + * to this class, or a related class should manually verify that all the tests in the test class are running as expected. */ public class GroupByResourcesReservationPool { /** * Map of query's resource id -> group by resources reserved for the query to execute */ - final ConcurrentHashMap pool = new ConcurrentHashMap<>(); + final ConcurrentHashMap> pool = new ConcurrentHashMap<>(); /** * Buffer pool from where the merge buffers are picked and reserved @@ -105,23 +110,34 @@ public GroupByResourcesReservationPool( /** * Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map. - * This is a blocking call, and can block upto the given query's timeout + * This is a blocking call, and can block up to the given query's timeout */ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner) { if (queryResourceId == null) { throw DruidException.defensive("Query resource id must be populated"); } + + // First check if the query resource id is present in the map, and if not, populate a dummy reference. This will + // block other threads from populating the map with the same query id, and is essentially same as reserving a spot in + // the map for the given query id. Since the actual allocation of the resource might take longer than expected, we + // do it out of the critical section, once we have "reserved" the spot + AtomicReference reference = new AtomicReference<>(null); + AtomicReference existingResource = pool.putIfAbsent(queryResourceId, reference); + + // Multiple attempts made to allocate the query resource for a given resource id. Throw an exception + //noinspection VariableNotUsedInsideIf + if (existingResource != null) { + throw DruidException.defensive("Resource with the given identifier [%s] is already present", queryResourceId); + } + + // We have reserved a spot in the map. Now begin the blocking call. GroupByQueryResources resources = GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig); - pool.compute(queryResourceId, (id, existingResource) -> { - if (existingResource != null) { - resources.close(); - throw DruidException.defensive("Resource with the given identifier [%s] is already present", id); - } - return resources; - }); + // Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the + // allocated resources from it + reference.compareAndSet(null, resources); } /** @@ -130,7 +146,9 @@ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, @Nullable public GroupByQueryResources fetch(QueryResourceId queryResourceId) { - return pool.get(queryResourceId); + GroupByQueryResources resource = pool.get(queryResourceId).get(); + assert resource != null; + return resource; } /** @@ -138,9 +156,12 @@ public GroupByQueryResources fetch(QueryResourceId queryResourceId) */ public void clean(QueryResourceId queryResourceId) { - GroupByQueryResources resources = pool.remove(queryResourceId); - if (resources != null) { - resources.close(); + AtomicReference resourcesReference = pool.remove(queryResourceId); + if (resourcesReference != null) { + GroupByQueryResources resource = resourcesReference.get(); + // Reference should refer to a non-empty resource + assert resource != null; + resource.close(); } } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index 5fa26d3fa2b4..6451fb9b943d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -142,6 +142,8 @@ public GroupingEngine( * {@link GroupByMergingQueryRunner} for a particular query. The resources are to be acquired once throughout the * execution of the query, or need to be re-acquired (if needed). Users must ensure that throughout the execution, * a query already holding the resources shouldn't request for more resources, because that can cause deadlocks. + *

+ * This method throws an exception if it is not able to allocate sufficient resources required for the query to succeed */ public static GroupByQueryResources prepareResource( GroupByQuery query, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java index 7bf1b002e165..53155633bbd0 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java @@ -23,12 +23,14 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.QueryResourceId; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.nio.ByteBuffer; @@ -38,6 +40,24 @@ public class GroupByResourcesReservationPoolTest { + /** + * CONFIG + QUERY require exactly 1 merge buffer to succeed if 'willMergeRunners' is true while allocating the resources + */ + private static final GroupByQueryConfig CONFIG = new GroupByQueryConfig(); + private static final GroupByQuery QUERY = GroupByQuery.builder() + .setInterval(Intervals.ETERNITY) + .setDataSource("foo") + .setDimensions( + ImmutableList.of( + new DefaultDimensionSpec("dim2", "_d0") + ) + ) + .setGranularity(Granularities.ALL) + .setContext( + ImmutableMap.of("timeout", 0) + ) // Query can block indefinitely + .build(); + /** * This test confirms that the interleaved GroupByResourcesReservationPool.reserve() and GroupByResourcesReservationPool.clean() * between multiple threads succeed. It is specifically designed to test the case when the operations are interleaved in the @@ -53,33 +73,28 @@ public class GroupByResourcesReservationPoolTest * The test should complete under 10 seconds, and the majority of the time would be consumed by waiting for the thread * that sleeps for 5 seconds */ + @Ignore( + "Isn't run as a part of CI since it sleeps for 5 seconds. Callers must run the test manually if any changes are made " + + "to the corresponding class" + ) @Test(timeout = 100_000L) public void testInterleavedReserveAndRemove() { ExecutorService executor = Execs.multiThreaded(3, "group-by-resources-reservation-pool-test-%d"); - // Group by query + query configuration that would try to acquire exactly 1 merge buffer - GroupByQueryConfig config = new GroupByQueryConfig(); - GroupByQuery query = GroupByQuery.builder() - .setInterval(Intervals.ETERNITY) - .setDataSource("foo") - .setDimensions(ImmutableList.of(new DefaultDimensionSpec("dim2", "_d0"))) - .setGranularity(Granularities.ALL) - .setContext(ImmutableMap.of("timeout", 0)) // Query can block indefinitely - .build(); // Sanity checks that the query will acquire exactly one merge buffer. This safeguards the test being useful in // case the merge buffer acquisition code changes to acquire less than one merge buffer (the test would be // useless in that case) or more than one merge buffer (the test would incorrectly fail in that case) - Assert.assertTrue( - GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(config, query) - + GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(query) - == 1 + Assert.assertEquals( + 1, + GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(CONFIG, QUERY) + + GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(QUERY) ); // Blocking pool with a single buffer, which means only one of the queries can succeed at a time BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1); GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(mergeBufferPool, config); + new GroupByResourcesReservationPool(mergeBufferPool, CONFIG); // Latch indicating that the first thread has called reservationPool.reserve() CountDownLatch reserveCalledByFirstThread = new CountDownLatch(1); @@ -110,7 +125,7 @@ public boolean equals(Object o) return super.equals(o); } }; - groupByResourcesReservationPool.reserve(queryResourceId1, query, true); + groupByResourcesReservationPool.reserve(queryResourceId1, QUERY, true); reserveCalledByFirstThread.countDown(); try { reserveCalledBySecondThread.await(); @@ -146,10 +161,10 @@ public boolean equals(Object o) } }; - // Since the reserve() call is blocking, we need to execute it separately, so that we can countdown the latch + // Since the reserve() call is blocking, we need to execute it separately, so that we can count down the latch // and inform Thread1 the reserve call has been made by this thread executor.submit(() -> { - groupByResourcesReservationPool.reserve(queryResourceId2, query, true); + groupByResourcesReservationPool.reserve(queryResourceId2, QUERY, true); threadsCompleted.countDown(); }); try { @@ -171,4 +186,21 @@ public boolean equals(Object o) Assert.fail("Interrupted while waiting for the threads to complete"); } } + + @Test + public void testMultipleAllocationAttemptsFail() + { + BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1); + GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(mergeBufferPool, CONFIG); + QueryResourceId queryResourceId = new QueryResourceId("test-id"); + + groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true); + + Assert.assertThrows( + DruidException.class, + () -> groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true) + ); + + } } From 8f68b2b1d111bc6e2a24d6cf73a9016bd6c3400f Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 3 May 2024 11:35:35 +0530 Subject: [PATCH 4/7] more changes --- .../GroupByResourcesReservationPoolTest.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java index 53155633bbd0..8d0f2d9e37dc 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java @@ -188,7 +188,7 @@ public boolean equals(Object o) } @Test - public void testMultipleAllocationAttemptsFail() + public void testMultipleSimultaneousAllocationAttemptsFail() { BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1); GroupByResourcesReservationPool groupByResourcesReservationPool = @@ -201,6 +201,27 @@ public void testMultipleAllocationAttemptsFail() DruidException.class, () -> groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true) ); + } + + @Test + public void testMultipleSequentialAllocationAttemptsSucceed() + { + BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1); + GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(mergeBufferPool, CONFIG); + QueryResourceId queryResourceId = new QueryResourceId("test-id"); + + groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true); + GroupByQueryResources oldResources = groupByResourcesReservationPool.fetch(queryResourceId); + + // Cleanup the resources + groupByResourcesReservationPool.clean(queryResourceId); + + // Repeat the calls + groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true); + GroupByQueryResources newResources = groupByResourcesReservationPool.fetch(queryResourceId); + Assert.assertNotNull(newResources); + Assert.assertNotSame(oldResources, newResources); } } From 5cb020c94c803731491506e48a7e082910cc37cd Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 7 May 2024 01:26:11 +0530 Subject: [PATCH 5/7] more changes --- .../GroupByResourcesReservationPool.java | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java index 3fe9379ba42e..355e1d9bb112 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java @@ -131,9 +131,16 @@ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, throw DruidException.defensive("Resource with the given identifier [%s] is already present", queryResourceId); } - // We have reserved a spot in the map. Now begin the blocking call. - GroupByQueryResources resources = - GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig); + GroupByQueryResources resources; + try { + // We have reserved a spot in the map. Now begin the blocking call. + resources = GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig); + } + catch (Exception e) { + // Unable to allocate the resources, perform cleanup and rethrow the exception + pool.remove(queryResourceId); + throw e; + } // Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the // allocated resources from it @@ -146,8 +153,17 @@ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, @Nullable public GroupByQueryResources fetch(QueryResourceId queryResourceId) { - GroupByQueryResources resource = pool.get(queryResourceId).get(); - assert resource != null; + AtomicReference resourcesReference = pool.get(queryResourceId); + if (resourcesReference == null) { + // There weren't any resources allocated corresponding to the provided resource id + return null; + } + GroupByQueryResources resource = resourcesReference.get(); + if (resource == null) { + throw DruidException.defensive( + "Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found" + ); + } return resource; } @@ -160,7 +176,11 @@ public void clean(QueryResourceId queryResourceId) if (resourcesReference != null) { GroupByQueryResources resource = resourcesReference.get(); // Reference should refer to a non-empty resource - assert resource != null; + if (resource == null) { + throw DruidException.defensive( + "Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found" + ); + } resource.close(); } } From dedffc1bd17b038ca5dfaae1a3c2e2faa9fc3724 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 7 May 2024 02:46:57 +0530 Subject: [PATCH 6/7] message --- .../query/groupby/GroupByResourcesReservationPool.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java index 355e1d9bb112..cf675eca42f9 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java @@ -161,7 +161,8 @@ public GroupByQueryResources fetch(QueryResourceId queryResourceId) GroupByQueryResources resource = resourcesReference.get(); if (resource == null) { throw DruidException.defensive( - "Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found" + "Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found", + queryResourceId ); } return resource; @@ -178,7 +179,8 @@ public void clean(QueryResourceId queryResourceId) // Reference should refer to a non-empty resource if (resource == null) { throw DruidException.defensive( - "Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found" + "Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found", + queryResourceId ); } resource.close(); From a139107f50e66672ebe14bf912761da8d128ccc9 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 8 May 2024 09:14:56 +0530 Subject: [PATCH 7/7] review --- .../druid/query/groupby/GroupByResourcesReservationPool.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java index cf675eca42f9..9c65ce445b1b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java @@ -136,10 +136,10 @@ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, // We have reserved a spot in the map. Now begin the blocking call. resources = GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig); } - catch (Exception e) { + catch (Throwable t) { // Unable to allocate the resources, perform cleanup and rethrow the exception pool.remove(queryResourceId); - throw e; + throw t; } // Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the