diff --git a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java index 9e9a7d77df57..b779fc29d356 100644 --- a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java +++ b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java @@ -29,8 +29,6 @@ import java.util.Comparator; import java.util.function.BinaryOperator; -/** - */ public class CombiningSequence implements Sequence { public static CombiningSequence create( @@ -76,9 +74,22 @@ public Yielder toYielder(OutType initValue, final YieldingAcc new CombiningYieldingAccumulator<>(ordering, mergeFn, accumulator); combiningAccumulator.setRetVal(initValue); - Yielder baseYielder = baseSequence.toYielder(null, combiningAccumulator); - return makeYielder(baseYielder, combiningAccumulator, false); + final Yielder baseYielder = baseSequence.toYielder(null, combiningAccumulator); + + try { + return makeYielder(baseYielder, combiningAccumulator, false); + } + catch (Throwable t1) { + try { + baseYielder.close(); + } + catch (Throwable t2) { + t1.addSuppressed(t2); + } + + throw t1; + } } private Yielder makeYielder( diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ExplodingSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ExplodingSequence.java new file mode 100644 index 000000000000..5468a5d86033 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ExplodingSequence.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.guava; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Wraps an underlying sequence and allows us to force it to explode at various points. + */ +public class ExplodingSequence extends YieldingSequenceBase +{ + private final Sequence baseSequence; + private final boolean getThrowsException; + private final boolean closeThrowsException; + private final AtomicLong closed = new AtomicLong(); + + public ExplodingSequence(Sequence baseSequence, boolean getThrowsException, boolean closeThrowsException) + { + this.baseSequence = baseSequence; + this.getThrowsException = getThrowsException; + this.closeThrowsException = closeThrowsException; + } + + @Override + public Yielder toYielder( + final OutType initValue, + final YieldingAccumulator accumulator + ) + { + return wrapYielder(baseSequence.toYielder(initValue, accumulator)); + } + + public long getCloseCount() + { + return closed.get(); + } + + private Yielder wrapYielder(final Yielder baseYielder) + { + return new Yielder() + { + @Override + public OutType get() + { + if (getThrowsException) { + throw new RuntimeException("get"); + } else { + return baseYielder.get(); + } + } + + @Override + public Yielder next(OutType initValue) + { + return wrapYielder(baseYielder.next(initValue)); + } + + @Override + public boolean isDone() + { + return baseYielder.isDone(); + } + + @Override + public void close() throws IOException + { + closed.incrementAndGet(); + + if (closeThrowsException) { + throw new IOException("close"); + } else { + baseYielder.close(); + } + } + }; + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java index 912775cba8ed..90f9b9a48ff3 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java @@ -58,38 +58,62 @@ public Yielder toYielder(OutType initValue, YieldingAccumulat ) ); - pQueue = baseSequences.accumulate( - pQueue, - (queue, in) -> { - final Yielder yielder = in.toYielder( - null, - new YieldingAccumulator() - { - @Override - public T accumulate(T accumulated, T in) + try { + pQueue = baseSequences.accumulate( + pQueue, + (queue, in) -> { + final Yielder yielder = in.toYielder( + null, + new YieldingAccumulator() { - yield(); - return in; + @Override + public T accumulate(T accumulated, T in) + { + yield(); + return in; + } } + ); + + if (!yielder.isDone()) { + try { + queue.add(yielder); } - ); + catch (Throwable t1) { + try { + yielder.close(); + } + catch (Throwable t2) { + t1.addSuppressed(t2); + } - if (!yielder.isDone()) { - queue.add(yielder); - } else { - try { - yielder.close(); - } - catch (IOException e) { - throw new RuntimeException(e); + throw t1; + } + } else { + try { + yielder.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } } + + return queue; } + ); - return queue; - } - ); + return makeYielder(pQueue, initValue, accumulator); + } + catch (Throwable t1) { + try { + closeAll(pQueue); + } + catch (Throwable t2) { + t1.addSuppressed(t2); + } - return makeYielder(pQueue, initValue, accumulator); + throw t1; + } } private Yielder makeYielder( @@ -101,8 +125,22 @@ private Yielder makeYielder( OutType retVal = initVal; while (!accumulator.yielded() && !pQueue.isEmpty()) { Yielder yielder = pQueue.remove(); - retVal = accumulator.accumulate(retVal, yielder.get()); - yielder = yielder.next(null); + + try { + retVal = accumulator.accumulate(retVal, yielder.get()); + yielder = yielder.next(null); + } + catch (Throwable t1) { + try { + yielder.close(); + } + catch (Throwable t2) { + t1.addSuppressed(t2); + } + + throw t1; + } + if (yielder.isDone()) { try { yielder.close(); @@ -144,12 +182,21 @@ public boolean isDone() @Override public void close() throws IOException { - Closer closer = Closer.create(); - while (!pQueue.isEmpty()) { - closer.register(pQueue.remove()); - } - closer.close(); + closeAll(pQueue); } }; } + + private static void closeAll(final PriorityQueue> pQueue) throws IOException + { + Closer closer = Closer.create(); + while (!pQueue.isEmpty()) { + final Yielder yielder = pQueue.poll(); + if (yielder != null) { + // Note: yielder can be null if our comparator threw an exception during queue.add. + closer.register(yielder); + } + } + closer.close(); + } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/YieldingSequenceBase.java b/core/src/main/java/org/apache/druid/java/util/common/guava/YieldingSequenceBase.java index c5ed0111d074..9247989f5f64 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/YieldingSequenceBase.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/YieldingSequenceBase.java @@ -19,6 +19,8 @@ package org.apache.druid.java.util.common.guava; +import java.io.IOException; + /** * A Sequence that is based entirely on the Yielder implementation. *

@@ -29,16 +31,33 @@ public abstract class YieldingSequenceBase implements Sequence @Override public OutType accumulate(OutType initValue, Accumulator accumulator) { + final OutType retVal; Yielder yielder = toYielder(initValue, YieldingAccumulators.fromAccumulator(accumulator)); try { while (!yielder.isDone()) { yielder = yielder.next(yielder.get()); } - return yielder.get(); + retVal = yielder.get(); } - finally { - CloseQuietly.close(yielder); + catch (Throwable t1) { + try { + yielder.close(); + } + catch (Throwable t2) { + t1.addSuppressed(t2); + } + + throw t1; } + + try { + yielder.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + return retVal; } } diff --git a/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java b/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java index 9f64d6732863..b8872f08d385 100644 --- a/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java +++ b/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java @@ -20,17 +20,21 @@ package org.apache.druid.common.guava; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.guava.ExplodingSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.YieldingAccumulator; +import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -39,6 +43,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -194,6 +199,40 @@ public void testNothing() throws Exception testCombining(Collections.emptyList(), Collections.emptyList()); } + @Test + public void testExplodingSequence() + { + final ExplodingSequence bomb = + new ExplodingSequence<>(Sequences.simple(ImmutableList.of(1, 2, 2)), false, true); + + final CombiningSequence combiningSequence = + CombiningSequence.create(bomb, Comparator.naturalOrder(), (a, b) -> a); + + try { + combiningSequence.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Integer accumulate(Integer accumulated, Integer in) + { + if (in > 1) { + throw new RuntimeException("boom"); + } + + return in; + } + } + ); + Assert.fail("Expected exception"); + } + catch (Exception e) { + Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("boom"))); + } + + Assert.assertEquals("Closes resources", 1, bomb.getCloseCount()); + } + private void testCombining(List> pairs, List> expected) throws Exception { diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/MergeSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/MergeSequenceTest.java index 61af63869487..f7817c75293a 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/guava/MergeSequenceTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/guava/MergeSequenceTest.java @@ -19,10 +19,13 @@ package org.apache.druid.java.util.common.guava; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; -import junit.framework.Assert; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; import java.util.ArrayList; import java.util.Arrays; @@ -41,7 +44,10 @@ public void testSanity() throws Exception TestSequence.create(4, 6, 8) ); - MergeSequence seq = new MergeSequence<>(Ordering.natural(), (Sequence) Sequences.simple(testSeqs)); + MergeSequence seq = new MergeSequence<>( + Ordering.natural(), + (Sequence) Sequences.simple(testSeqs) + ); SequenceTestHelper.testAll(seq, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 8, 9)); for (TestSequence sequence : testSeqs) { @@ -58,7 +64,10 @@ public void testWorksWhenBeginningOutOfOrder() throws Exception TestSequence.create(4, 6, 8) ); - MergeSequence seq = new MergeSequence<>(Ordering.natural(), (Sequence) Sequences.simple(testSeqs)); + MergeSequence seq = new MergeSequence<>( + Ordering.natural(), + (Sequence) Sequences.simple(testSeqs) + ); SequenceTestHelper.testAll(seq, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 8, 9)); for (TestSequence sequence : testSeqs) { @@ -76,7 +85,10 @@ public void testMergeEmpties() throws Exception TestSequence.create(4, 6, 8) ); - MergeSequence seq = new MergeSequence<>(Ordering.natural(), (Sequence) Sequences.simple(testSeqs)); + MergeSequence seq = new MergeSequence<>( + Ordering.natural(), + (Sequence) Sequences.simple(testSeqs) + ); SequenceTestHelper.testAll(seq, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 8, 9)); for (TestSequence sequence : testSeqs) { @@ -94,7 +106,10 @@ public void testMergeEmpties1() throws Exception TestSequence.create(4, 6, 8) ); - MergeSequence seq = new MergeSequence<>(Ordering.natural(), (Sequence) Sequences.simple(testSeqs)); + MergeSequence seq = new MergeSequence<>( + Ordering.natural(), + (Sequence) Sequences.simple(testSeqs) + ); SequenceTestHelper.testAll(seq, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 8, 9)); for (TestSequence sequence : testSeqs) { @@ -113,7 +128,10 @@ public void testMergeEmpties2() throws Exception TestSequence.create() ); - MergeSequence seq = new MergeSequence<>(Ordering.natural(), (Sequence) Sequences.simple(testSeqs)); + MergeSequence seq = new MergeSequence<>( + Ordering.natural(), + (Sequence) Sequences.simple(testSeqs) + ); SequenceTestHelper.testAll(seq, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 8, 9)); for (TestSequence sequence : testSeqs) { @@ -130,7 +148,10 @@ public void testScrewsUpOnOutOfOrder() throws Exception TestSequence.create(4, 6) ); - MergeSequence seq = new MergeSequence<>(Ordering.natural(), (Sequence) Sequences.simple(testSeqs)); + MergeSequence seq = new MergeSequence<>( + Ordering.natural(), + (Sequence) Sequences.simple(testSeqs) + ); SequenceTestHelper.testAll(seq, Arrays.asList(1, 2, 3, 4, 5, 4, 6, 7, 8, 9)); for (TestSequence sequence : testSeqs) { @@ -169,4 +190,116 @@ public void testMergeOne() throws Exception SequenceTestHelper.testAll(mergeOne, Collections.singletonList(1)); } + @Test + public void testTwoExplodingOnGetSequences() + { + final ExplodingSequence bomb1 = + new ExplodingSequence<>(Sequences.simple(ImmutableList.of(1, 2, 2)), true, false); + final ExplodingSequence bomb2 = + new ExplodingSequence<>(Sequences.simple(ImmutableList.of(1, 2, 2)), true, false); + + final MergeSequence mergeSequence = + new MergeSequence<>( + Ordering.natural(), + Sequences.simple(ImmutableList.of(bomb1, bomb2)) + ); + + try { + mergeSequence.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Integer accumulate(Integer accumulated, Integer in) + { + return in; + } + } + ); + Assert.fail("Expected exception"); + } + catch (Exception e) { + Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("get"))); + } + + Assert.assertEquals("Closes resources (1)", 1, bomb1.getCloseCount()); + Assert.assertEquals("Closes resources (2)", 1, bomb2.getCloseCount()); + } + + @Test + public void testTwoExplodingOnCloseSequences() + { + final ExplodingSequence bomb1 = + new ExplodingSequence<>(Sequences.simple(ImmutableList.of(1, 2, 2)), false, true); + final ExplodingSequence bomb2 = + new ExplodingSequence<>(Sequences.simple(ImmutableList.of(1, 2, 2)), false, true); + + final MergeSequence mergeSequence = + new MergeSequence<>( + Ordering.natural(), + Sequences.simple(ImmutableList.of(bomb1, bomb2)) + ); + + try { + mergeSequence.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Integer accumulate(Integer accumulated, Integer in) + { + if (in > 1) { + throw new RuntimeException("boom"); + } + + return in; + } + } + ); + Assert.fail("Expected exception"); + } + catch (Exception e) { + Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("boom"))); + } + + Assert.assertEquals("Closes resources (1)", 1, bomb1.getCloseCount()); + Assert.assertEquals("Closes resources (2)", 1, bomb2.getCloseCount()); + } + + @Test + public void testOneEmptyOneExplodingSequence() + { + final ExplodingSequence bomb = + new ExplodingSequence<>(Sequences.simple(ImmutableList.of(1, 2, 2)), false, true); + + final MergeSequence mergeSequence = + new MergeSequence<>( + Ordering.natural(), + Sequences.simple(ImmutableList.of(Sequences.empty(), bomb)) + ); + + try { + mergeSequence.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Integer accumulate(Integer accumulated, Integer in) + { + if (in > 1) { + throw new RuntimeException("boom"); + } + + return in; + } + } + ); + Assert.fail("Expected exception"); + } + catch (Exception e) { + Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("boom"))); + } + + Assert.assertEquals("Closes resources", 1, bomb.getCloseCount()); + } } diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/YieldingSequenceBaseTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/YieldingSequenceBaseTest.java new file mode 100644 index 000000000000..19e396bc064c --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/guava/YieldingSequenceBaseTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.guava; + +import com.google.common.collect.ImmutableList; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableCauseMatcher; +import org.junit.internal.matchers.ThrowableMessageMatcher; + +import java.util.ArrayList; + +public class YieldingSequenceBaseTest +{ + @Test + public void testAccumulate() + { + final ExplodingSequence sequence = new ExplodingSequence<>( + Sequences.simple(ImmutableList.of(1, 2, 3)), + false, + false + ); + + Assert.assertEquals(ImmutableList.of(1, 2, 3), sequence.accumulate(new ArrayList<>(), Accumulators.list())); + Assert.assertEquals("Closes resources", 1, sequence.getCloseCount()); + } + + @Test + public void testExceptionDuringGet() + { + final ExplodingSequence sequence = new ExplodingSequence<>( + Sequences.simple(ImmutableList.of(1, 2, 3)), + true, + false + ); + + try { + sequence.accumulate(new ArrayList<>(), Accumulators.list()); + Assert.fail("Expected exception"); + } + catch (Exception e) { + Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("get"))); + } + + Assert.assertEquals("Closes resources", 1, sequence.getCloseCount()); + } + + @Test + public void testExceptionDuringClose() + { + final ExplodingSequence sequence = new ExplodingSequence<>( + Sequences.simple(ImmutableList.of(1, 2, 3)), + false, + true + ); + + try { + sequence.accumulate(new ArrayList<>(), Accumulators.list()); + Assert.fail("Expected exception"); + } + catch (Exception e) { + Assert.assertThat( + e, + + // Wrapped one level deep because it's an IOException + ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("close"))) + ); + } + + Assert.assertEquals("Closes resources", 1, sequence.getCloseCount()); + } + + @Test + public void testExceptionDuringGetAndClose() + { + final ExplodingSequence sequence = new ExplodingSequence<>( + Sequences.simple(ImmutableList.of(1, 2, 3)), + true, + true + ); + + try { + sequence.accumulate(new ArrayList<>(), Accumulators.list()); + Assert.fail("Expected exception"); + } + catch (Exception e) { + Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("get"))); + } + + Assert.assertEquals("Closes resources", 1, sequence.getCloseCount()); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 8233315d5e83..52066ab1f25e 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -46,6 +46,7 @@ import org.apache.druid.segment.Segment; import org.joda.time.Interval; +import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -83,7 +84,7 @@ public QueryRunner createRunner(Segment segment) @Override public QueryRunner mergeRunners( - ExecutorService queryExecutor, + final ExecutorService queryExecutor, final Iterable> queryRunners ) { @@ -122,14 +123,19 @@ public QueryRunner mergeRunners( : query.getMaxRowsQueuedForOrdering()); if (query.getScanRowsLimit() <= maxRowsQueuedForOrdering) { // Use priority queue strategy - return priorityQueueSortAndLimit( - Sequences.concat(Sequences.map( - Sequences.simple(queryRunnersOrdered), - input -> input.run(queryPlus, responseContext) - )), - query, - intervalsOrdered - ); + try { + return priorityQueueSortAndLimit( + Sequences.concat(Sequences.map( + Sequences.simple(queryRunnersOrdered), + input -> input.run(queryPlus, responseContext) + )), + query, + intervalsOrdered + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } } else { // Use n-way merge strategy List>> intervalsAndRunnersOrdered = new ArrayList<>(); @@ -149,11 +155,11 @@ public QueryRunner mergeRunners( // query runners for that segment LinkedHashMap>>> partitionsGroupedByInterval = intervalsAndRunnersOrdered.stream() - .collect(Collectors.groupingBy( - x -> x.lhs, - LinkedHashMap::new, - Collectors.toList() - )); + .collect(Collectors.groupingBy( + x -> x.lhs, + LinkedHashMap::new, + Collectors.toList() + )); // Find the segment with the largest numbers of partitions. This will be used to compare with the // maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory. @@ -203,7 +209,7 @@ Sequence priorityQueueSortAndLimit( Sequence inputSequence, ScanQuery scanQuery, List intervalsOrdered - ) + ) throws IOException { Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); @@ -232,48 +238,54 @@ public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue i } } ); - boolean doneScanning = yielder.isDone(); - // We need to scan limit elements and anything else in the last segment - int numRowsScanned = 0; - Interval finalInterval = null; - while (!doneScanning) { - ScanResultValue next = yielder.get(); - List singleEventScanResultValues = next.toSingleEventScanResultValues(); - for (ScanResultValue srv : singleEventScanResultValues) { - numRowsScanned++; - // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list - // needs to be preserved for queries using the compactedList result format - q.offer(srv); - if (q.size() > limit) { - q.poll(); - } - // Finish scanning the interval containing the limit row - if (numRowsScanned > limit && finalInterval == null) { - long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat()); - for (Interval interval : intervalsOrdered) { - if (interval.contains(timestampOfLimitRow)) { - finalInterval = interval; - } + try { + boolean doneScanning = yielder.isDone(); + // We need to scan limit elements and anything else in the last segment + int numRowsScanned = 0; + Interval finalInterval = null; + while (!doneScanning) { + ScanResultValue next = yielder.get(); + List singleEventScanResultValues = next.toSingleEventScanResultValues(); + for (ScanResultValue srv : singleEventScanResultValues) { + numRowsScanned++; + // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list + // needs to be preserved for queries using the compactedList result format + q.offer(srv); + if (q.size() > limit) { + q.poll(); } - if (finalInterval == null) { - throw new ISE("WTH??? Row came from an unscanned interval?"); + + // Finish scanning the interval containing the limit row + if (numRowsScanned > limit && finalInterval == null) { + long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat()); + for (Interval interval : intervalsOrdered) { + if (interval.contains(timestampOfLimitRow)) { + finalInterval = interval; + } + } + if (finalInterval == null) { + throw new ISE("WTH??? Row came from an unscanned interval?"); + } } } + yielder = yielder.next(null); + doneScanning = yielder.isDone() || + (finalInterval != null && + !finalInterval.contains(next.getFirstEventTimestamp(scanQuery.getResultFormat()))); + } + // Need to convert to a Deque because Priority Queue's iterator doesn't guarantee that the sorted order + // will be maintained. Deque was chosen over list because its addFirst is O(1). + final Deque sortedElements = new ArrayDeque<>(q.size()); + while (q.size() != 0) { + // addFirst is used since PriorityQueue#poll() dequeues the low-priority (timestamp-wise) events first. + sortedElements.addFirst(q.poll()); } - yielder = yielder.next(null); - doneScanning = yielder.isDone() || - (finalInterval != null && - !finalInterval.contains(next.getFirstEventTimestamp(scanQuery.getResultFormat()))); + return Sequences.simple(sortedElements); } - // Need to convert to a Deque because Priority Queue's iterator doesn't guarantee that the sorted order - // will be maintained. Deque was chosen over list because its addFirst is O(1). - final Deque sortedElements = new ArrayDeque<>(q.size()); - while (q.size() != 0) { - // addFirst is used since PriorityQueue#poll() dequeues the low-priority (timestamp-wise) events first. - sortedElements.addFirst(q.poll()); + finally { + yielder.close(); } - return Sequences.simple(sortedElements); } @VisibleForTesting diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index 8038d68a2528..801324847d70 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -43,6 +43,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -114,7 +115,7 @@ public static Iterable constructorFeeder() } @Test - public void testSortAndLimitScanResultValues() + public void testSortAndLimitScanResultValues() throws IOException { List srvs = new ArrayList<>(numElements); List expectedEventTimestamps = new ArrayList<>();