From 8643835feac299d46139ec07656477a10eff7532 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 6 Aug 2020 01:52:40 -0700 Subject: [PATCH 01/11] Remove CloseQuietly and migrate its usages to other methods. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These other methods include: 1) New method CloseableUtils.closeAndWrapExceptions, which wraps IOExceptions in RuntimeExceptions for callers that just want to avoid dealing with checked exceptions. Most usages were migrated to this method, because it looks like they were mainly attempts to avoid declaring a throws clause, and perhaps were unintentionally suppressing IOExceptions. 2) New method CloseableUtils.closeInCatch, designed to properly close something in a catch block without losing exceptions. Some usages from catch blocks were migrated here, when it seemed that they were intended to avoid checked exception handling, and did not really intend to also suppress IOExceptions. 3) New method CloseableUtils.closeAndSuppressExceptions, which sends all exceptions to a "chomper" that consumes them. Nothing is thrown or returned. The behavior is slightly different: with this method, _all_ exceptions are suppressed, not just IOExceptions. Calls that seemed like they had good reason to suppress exceptions were migrated here. 4) Some calls were migrated to try-with-resources, in cases where it appeared that CloseQuietly was being used to avoid throwing an exception in a finally block. 🎵 You don't have to go home, but you can't stay here... 🎵 --- .../input/impl/prefetch/JsonIterator.java | 12 +- .../java/util/common/guava/CloseQuietly.java | 45 --- .../guava/ParallelMergeCombiningSequence.java | 34 ++- .../java/util/http/client/HttpClientInit.java | 35 +-- .../SequenceInputStreamResponseHandler.java | 21 +- .../apache/druid/utils/CloseableUtils.java | 103 ++++++- .../druid/utils/CloseableUtilsTest.java | 276 ++++++++++++++++++ .../AppenderatorDriverRealtimeIndexTask.java | 5 +- .../common/task/RealtimeIndexTask.java | 10 +- .../indexing/input/DruidSegmentReader.java | 11 +- .../indexing/overlord/http/OverlordTest.java | 11 +- .../apache/druid/guice/PropertiesModule.java | 38 ++- .../query/groupby/GroupByQueryEngine.java | 15 +- .../groupby/strategy/GroupByStrategyV2.java | 15 +- .../query/scan/ScanQueryQueryToolChest.java | 4 +- .../druid/query/topn/PooledTopNAlgorithm.java | 4 +- .../QueryableIndexIndexableAdapter.java | 5 +- .../column/StringDictionaryEncodedColumn.java | 12 +- .../BlockLayoutColumnarDoublesSupplier.java | 4 +- .../BlockLayoutColumnarFloatsSupplier.java | 4 +- .../BlockLayoutColumnarLongsSupplier.java | 6 +- .../data/CompressedColumnarIntsSupplier.java | 14 +- .../CompressedVSizeColumnarIntsSupplier.java | 4 +- .../druid/segment/data/GenericIndexed.java | 6 +- .../druid/segment/join/HashJoinSegment.java | 7 +- .../aggregation/AggregationTestHelper.java | 4 +- .../StringColumnAggregationTest.java | 4 +- .../CompressedColumnarIntsSerializerTest.java | 6 +- .../CompressedColumnarIntsSupplierTest.java | 12 +- .../data/CompressedDoublesSerdeTest.java | 4 +- .../data/CompressedFloatsSerdeTest.java | 29 +- .../data/CompressedLongsSerdeTest.java | 4 +- ...ressedVSizeColumnarIntsSerializerTest.java | 7 +- ...mpressedVSizeColumnarIntsSupplierTest.java | 25 +- ...dVSizeColumnarMultiIntsSerializerTest.java | 6 +- .../druid/client/DirectDruidClient.java | 4 +- .../druid/client/JsonParserIterator.java | 4 +- .../druid/client/cache/HybridCache.java | 2 +- .../druid/curator/announcement/Announcer.java | 12 +- .../discovery/CuratorDruidLeaderSelector.java | 13 +- .../CuratorDruidNodeDiscoveryProvider.java | 4 +- .../appenderator/SinkQuerySegmentWalker.java | 5 +- .../firehose/TimedShutoffFirehoseFactory.java | 2 +- .../druid/server/log/FileRequestLogger.java | 18 +- 44 files changed, 603 insertions(+), 263 deletions(-) delete mode 100644 core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java create mode 100644 core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java index c03e5f6d8e84..5abadb6c2ad1 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java @@ -25,9 +25,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.utils.CloseableUtils; import java.io.Closeable; import java.io.IOException; @@ -80,7 +79,7 @@ public boolean hasNext() return false; } if (jp.getCurrentToken() == JsonToken.END_ARRAY) { - CloseQuietly.close(jp); + CloseableUtils.closeAndWrapExceptions(jp); return false; } return true; @@ -131,11 +130,6 @@ private void init() @Override public void close() throws IOException { - Closer closer = Closer.create(); - if (jp != null) { - closer.register(jp); - } - closer.register(resourceCloser); - closer.close(); + CloseableUtils.closeAll(jp, resourceCloser); } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java b/core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java deleted file mode 100644 index 8b4807b90455..000000000000 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.common.guava; - -import org.apache.druid.java.util.common.logger.Logger; - -import java.io.Closeable; -import java.io.IOException; - -/** - */ -public class CloseQuietly -{ - private static final Logger log = new Logger(CloseQuietly.class); - - public static void close(Closeable closeable) - { - if (closeable == null) { - return; - } - try { - closeable.close(); - } - catch (IOException e) { - log.error(e, "IOException thrown while closing Closeable."); - } - } -} diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 7ab591a7d3d2..2addbf5bc489 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; @@ -175,6 +176,7 @@ static Sequence makeOutputSequenceForQueue( new BaseSequence.IteratorMaker>() { private boolean shouldCancelOnCleanup = true; + @Override public Iterator make() { @@ -463,18 +465,19 @@ private int computeNumTasks() final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1); - LOG.debug("Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " - + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " - + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", - computedNumParallelTasks, - parallelism, - getPool().getActiveThreadCount(), - runningThreadCount, - submissionCount, - getPool().getQueuedTaskCount(), - getPool().getParallelism(), - getPool().getPoolSize(), - getPool().getStealCount() + LOG.debug( + "Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " + + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " + + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", + computedNumParallelTasks, + parallelism, + getPool().getActiveThreadCount(), + runningThreadCount, + submissionCount, + getPool().getQueuedTaskCount(), + getPool().getParallelism(), + getPool().getPoolSize(), + getPool().getStealCount() ); return computedNumParallelTasks; @@ -609,7 +612,10 @@ protected void compute() // which we want to target a 10ms task run time. smooth this value with a cumulative moving average in order // to prevent normal jitter in processing time from skewing the next yield value too far in any direction final long elapsedNanos = System.nanoTime() - start; - final double nextYieldAfter = Math.max((double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), 1.0); + final double nextYieldAfter = Math.max( + (double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), + 1.0 + ); final long recursionDepth = metricsAccumulator.getTaskCount(); final double cumulativeMovingAverage = (nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1); @@ -1376,6 +1382,6 @@ private static void closeAllCursors(final Collection { Closer closer = Closer.create(); closer.registerAll(cursors); - CloseQuietly.close(closer); + CloseableUtils.closeAndWrapExceptions(closer); } } diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java index d42503912a1c..99db18b18fe6 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java @@ -19,8 +19,8 @@ package org.apache.druid.java.util.http.client; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory; import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory; @@ -40,17 +40,12 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.KeyManagementException; import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** + * */ public class HttpClientInit { @@ -134,11 +129,8 @@ public static ClientBootstrap createBootstrap(Lifecycle lifecycle) public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath, final String keyStorePassword) { - FileInputStream in = null; - try { + try (FileInputStream in = new FileInputStream(keyStorePath)) { final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); - - in = new FileInputStream(keyStorePath); ks.load(in, keyStorePassword.toCharArray()); in.close(); @@ -149,27 +141,10 @@ public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath return sslContext; } - catch (CertificateException e) { - throw new RuntimeException(e); - } - catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - catch (KeyStoreException e) { - throw new RuntimeException(e); - } - catch (KeyManagementException e) { - throw new RuntimeException(e); - } - catch (FileNotFoundException e) { - throw new RuntimeException(e); - } - catch (IOException e) { + catch (Exception e) { + Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - finally { - CloseQuietly.close(in); - } } private static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer, int bossPoolSize, int workerPoolSize) diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java index c3247d69832b..3f07bf2acb20 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.http.client.response; import com.google.common.io.ByteSource; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; @@ -57,19 +56,17 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response, TrafficCop trafficCop) { - ChannelBufferInputStream channelStream = null; - try { - channelStream = new ChannelBufferInputStream(response.getContent()); + try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(response.getContent())) { queue.put(channelStream); } + catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { log.error(e, "Queue appending interrupted"); Thread.currentThread().interrupt(); throw new RuntimeException(e); } - finally { - CloseQuietly.close(channelStream); - } byteCount.addAndGet(response.getContent().readableBytes()); return ClientResponse.finished( new SequenceInputStream( @@ -112,21 +109,19 @@ public ClientResponse handleChunk( final ChannelBuffer channelBuffer = chunk.getContent(); final int bytes = channelBuffer.readableBytes(); if (bytes > 0) { - ChannelBufferInputStream channelStream = null; - try { - channelStream = new ChannelBufferInputStream(channelBuffer); + try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(channelBuffer)) { queue.put(channelStream); // Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong log.debug("Added stream. Queue length %d", queue.size()); } + catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { log.warn(e, "Thread interrupted while adding to queue"); Thread.currentThread().interrupt(); throw new RuntimeException(e); } - finally { - CloseQuietly.close(channelStream); - } byteCount.addAndGet(bytes); } else { log.debug("Skipping zero length chunk"); diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java index 768abbe86dcd..72c2c8dbfabe 100644 --- a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java @@ -19,11 +19,17 @@ package org.apache.druid.utils; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.io.Closer; + +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.function.Consumer; /** - * Methods in this class could have belonged to {@link org.apache.druid.java.util.common.io.Closer}, but not editing + * Methods in this class could have belonged to {@link Closer}, but not editing * that class to keep its source close to Guava source. */ public final class CloseableUtils @@ -34,15 +40,98 @@ public final class CloseableUtils * first.close(); * second.close(); * - * to have safety of {@link org.apache.druid.java.util.common.io.Closer}, but without associated boilerplate code + * to have safety of {@link Closer}, but without associated boilerplate code * of creating a Closer and registering objects in it. */ - public static void closeBoth(Closeable first, Closeable second) throws IOException + public static void closeAll(Closeable first, Closeable... others) throws IOException { - //noinspection EmptyTryBlock - try (Closeable ignore1 = second; - Closeable ignore2 = first) { - // piggy-back try-with-resources semantics + final Closer closer = Closer.create(); + + // Register in reverse order, so we close from first to last. + closer.registerAll(Lists.reverse(Arrays.asList(others))); + closer.register(first); + closer.close(); + } + + /** + * Like {@link Closeable#close()}, but guaranteed to throw {@param caught}. Will add any exceptions encountered + * during closing to {@param caught} using {@link Throwable#addSuppressed(Throwable)}. + * + * Should be used like {@code throw CloseableUtils.closeInCatch(e, closeable)}. (The "throw" is important for + * reachability detection.) + */ + public static RuntimeException closeInCatch( + final E caught, + @Nullable final Closeable closeable + ) throws E + { + if (caught == null) { + // Incorrect usage; throw an exception with an error message that may be useful to the programmer. + final RuntimeException e1 = new IllegalStateException("Must be called with non-null caught exception"); + + if (closeable != null) { + try { + closeable.close(); + } + catch (Throwable e2) { + e1.addSuppressed(e2); + } + } + + throw e1; + } + + if (closeable != null) { + try { + closeable.close(); + } + catch (Throwable e) { + caught.addSuppressed(e); + } + } + + throw caught; + } + + /** + * Like {@link Closeable#close()} but wraps IOExceptions in RuntimeExceptions. + */ + public static void closeAndWrapExceptions(@Nullable final Closeable closeable) + { + if (closeable == null) { + return; + } + + try { + closeable.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Like {@link Closeable#close()} but sends any exceptions to the provided Consumer, and then throws them away. + * + * If the Consumer throws an exception, that exception is thrown by this method. So if your intent is to chomp + * exceptions, you should avoid writing a Consumer that might thrown an exception. + * + * Throwables that are not Exceptions are thrown rather than sent to the Consumer. + */ + public static void closeAndSuppressExceptions( + @Nullable final Closeable closeable, + final Consumer chomper + ) + { + if (closeable == null) { + return; + } + + try { + closeable.close(); + } + catch (Exception e) { + chomper.accept(e); } } diff --git a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java new file mode 100644 index 000000000000..858b06e63dd8 --- /dev/null +++ b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.utils; + +import com.google.common.base.Throwables; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class CloseableUtilsTest +{ + private final TestCloseable quietCloseable = new TestCloseable(null); + private final TestCloseable quietCloseable2 = new TestCloseable(null); + private final TestCloseable ioExceptionCloseable = new TestCloseable(new IOException()); + private final TestCloseable runtimeExceptionCloseable = new TestCloseable(new IllegalArgumentException()); + + // For closeAndSuppressException tests. + private final AtomicLong chomped = new AtomicLong(); + private final Consumer chomper = e -> chomped.incrementAndGet(); + + @Test + public void test_closeAll_quiet() throws IOException + { + CloseableUtils.closeAll(quietCloseable, quietCloseable2); + assertClosed(quietCloseable, quietCloseable2); + } + + @Test + public void test_closeAll_loud() + { + Exception e = null; + try { + CloseableUtils.closeAll(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + } + catch (Exception e2) { + e = e2; + } + + assertClosed(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class)); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(RuntimeException.class)); + } + + @Test + public void test_closeAndWrapExceptions_null() + { + CloseableUtils.closeAndWrapExceptions(null); + // Nothing happens. + } + + @Test + public void test_closeAndWrapExceptions_quiet() + { + CloseableUtils.closeAndWrapExceptions(quietCloseable); + assertClosed(quietCloseable); + } + + @Test + public void test_closeAndWrapExceptions_ioException() + { + Exception e = null; + try { + CloseableUtils.closeAndWrapExceptions(ioExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + assertClosed(ioExceptionCloseable); + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + } + + @Test + public void test_closeAndWrapExceptions_runtimeException() + { + Exception e = null; + try { + CloseableUtils.closeAndWrapExceptions(runtimeExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + assertClosed(runtimeExceptionCloseable); + Assert.assertThat(e, CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAndSuppressExceptions_null() + { + CloseableUtils.closeAndSuppressExceptions(null, chomper); + Assert.assertEquals(0, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_quiet() + { + CloseableUtils.closeAndSuppressExceptions(quietCloseable, chomper); + assertClosed(quietCloseable); + Assert.assertEquals(0, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_ioException() + { + CloseableUtils.closeAndSuppressExceptions(ioExceptionCloseable, chomper); + assertClosed(ioExceptionCloseable); + Assert.assertEquals(1, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_runtimeException() + { + CloseableUtils.closeAndSuppressExceptions(runtimeExceptionCloseable, chomper); + assertClosed(runtimeExceptionCloseable); + Assert.assertEquals(1, chomped.get()); + } + + @Test + public void test_closeInCatch_improper() throws Exception + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(null, quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Must be called with non-null caught exception")) + ); + } + + @Test + public void test_closeInCatch_quiet() throws Exception + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + } + + @Test + public void test_closeInCatch_ioException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), ioExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(ioExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IOException.class)); + } + + @Test + public void test_closeInCatch_runtimeException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), runtimeExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(runtimeExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + private static void assertClosed(final TestCloseable... closeables) + { + for (TestCloseable closeable : closeables) { + Assert.assertTrue(closeable.isClosed()); + } + } + + private static class TestCloseable implements Closeable + { + @Nullable + private final Exception e; + private final AtomicBoolean closed = new AtomicBoolean(false); + + TestCloseable(@Nullable Exception e) + { + this.e = e; + } + + @Override + public void close() throws IOException + { + closed.set(true); + if (e != null) { + Throwables.propagateIfInstanceOf(e, IOException.class); + throw Throwables.propagate(e); + } + } + + public boolean isClosed() + { + return closed.get(); + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index d20a8f44dd8b..6b9e222abc1f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -91,6 +91,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.apache.druid.utils.CircularBuffer; +import org.apache.druid.utils.CloseableUtils; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.GET; @@ -440,9 +441,9 @@ public TaskStatus run(final TaskToolbox toolbox) chatHandlerProvider.get().unregister(getId()); } - CloseQuietly.close(firehose); + CloseableUtils.closeAndSuppressExceptions(firehose, e -> log.warn("Failed to close Firehose")); appenderator.close(); - CloseQuietly.close(driver); + CloseableUtils.closeAndSuppressExceptions(driver, e -> log.warn("Failed to close AppenderatorDriver")); toolbox.removeMonitor(metricsMonitor); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java index 79aa3967bc0e..92a3754f00b7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java @@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; @@ -67,6 +66,7 @@ import org.apache.druid.segment.realtime.plumber.VersioningPolicy; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -302,7 +302,11 @@ public String getVersion(final Interval interval) { try { // Side effect: Calling getVersion causes a lock to be acquired - final TimeChunkLockAcquireAction action = new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs); + final TimeChunkLockAcquireAction action = new TimeChunkLockAcquireAction( + TaskLockType.EXCLUSIVE, + interval, + lockTimeoutMs + ); final TaskLock lock = Preconditions.checkNotNull( toolbox.getTaskActionClient().submit(action), "Cannot acquire a lock for interval[%s]", @@ -471,7 +475,7 @@ public void run() } finally { if (firehose != null) { - CloseQuietly.close(firehose); + CloseableUtils.closeAndSuppressExceptions(firehose, e -> log.warn("Failed to close Firehose")); } toolbox.removeMonitor(metricsMonitor); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java index 6460ae43d55d..f6b59e9b4538 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -48,6 +47,7 @@ import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; +import org.apache.druid.utils.CloseableUtils; import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; @@ -137,6 +137,7 @@ protected Map toMap(Map intermediateRow) * Map intermediate rows, selecting the dimensions and metrics of this segment reader. * * @param cursor A cursor + * * @return A sequence of intermediate rows */ private Sequence> cursorToSequence( @@ -152,8 +153,9 @@ private Sequence> cursorToSequence( * @param sequence A sequence of intermediate rows generated from a sequence of * cursors in {@link #intermediateRowIterator()} * @param segmentFile The underlying segment file containing the row data + * * @return A CloseableIterator from a sequence of intermediate rows, closing the underlying segment file - * when the iterator is closed. + * when the iterator is closed. */ @VisibleForTesting static CloseableIterator> makeCloseableIteratorFromSequenceAndSegmentFile( @@ -182,10 +184,7 @@ public Map next() @Override public void close() throws IOException { - Closer closer = Closer.create(); - closer.register(rowYielder); - closer.register(segmentFile); - closer.close(); + CloseableUtils.closeAll(rowYielder, segmentFile); } }; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 52373eaaca82..0f35c862bd1c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -60,7 +60,6 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.server.DruidNode; @@ -69,6 +68,7 @@ import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.utils.CloseableUtils; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; @@ -122,8 +122,8 @@ private void setupServerAndCurator() throws Exception private void tearDownServerAndCurator() { - CloseQuietly.close(curator); - CloseQuietly.close(server); + CloseableUtils.closeAndWrapExceptions(curator); + CloseableUtils.closeAndWrapExceptions(server); } @Before @@ -215,7 +215,10 @@ public void testOverlordRun() throws Exception Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort()); final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage); - final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); + final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter( + taskMaster, + null + ); // Test Overlord resource stuff overlordResource = new OverlordResource( taskMaster, diff --git a/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java b/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java index 4dd7b7045885..d42719c4f021 100644 --- a/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java +++ b/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java @@ -21,13 +21,12 @@ import com.google.inject.Binder; import com.google.inject.Module; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; +import javax.annotation.Nullable; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -56,15 +55,7 @@ public void configure(Binder binder) props.putAll(systemProps); for (String propertiesFile : propertiesFiles) { - InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile); - try { - if (stream == null) { - File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile)); - if (workingDirectoryFile.exists()) { - stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile)); - } - } - + try (InputStream stream = openPropertiesFile(propertiesFile, systemProps)) { if (stream != null) { log.debug("Loading properties from %s", propertiesFile); try (final InputStreamReader in = new InputStreamReader(stream, StandardCharsets.UTF_8)) { @@ -75,14 +66,29 @@ public void configure(Binder binder) } } } - catch (FileNotFoundException e) { - log.wtf(e, "This can only happen if the .exists() call lied."); - } - finally { - CloseQuietly.close(stream); + catch (IOException e) { + throw new RuntimeException(e); } } binder.bind(Properties.class).toInstance(props); } + + @Nullable + private static InputStream openPropertiesFile(final String propertiesFile, final Properties systemProps) + throws IOException + { + final InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile); + + if (stream != null) { + return stream; + } else { + File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile)); + if (workingDirectoryFile.exists()) { + return new BufferedInputStream(new FileInputStream(workingDirectoryFile)); + } else { + return null; + } + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java index 3a34b76c874a..5b856fbf881a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java @@ -33,7 +33,6 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.FunctionalIterator; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -50,11 +49,11 @@ import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.filter.Filters; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -66,6 +65,7 @@ import java.util.TreeMap; /** + * */ public class GroupByQueryEngine { @@ -131,21 +131,14 @@ public RowIterator make() @Override public void cleanup(RowIterator iterFromMake) { - CloseQuietly.close(iterFromMake); + CloseableUtils.closeAndWrapExceptions(iterFromMake); } } ); } } ), - new Closeable() - { - @Override - public void close() - { - CloseQuietly.close(bufferHolder); - } - } + bufferHolder ) ); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index e81eded2f9c0..9e0372fe2b2b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -34,7 +34,6 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.collect.Utils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.LazySequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -68,6 +67,7 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.utils.CloseableUtils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -330,8 +330,7 @@ public Sequence processSubqueryResult( ); } catch (Exception ex) { - CloseQuietly.close(resultSupplier); - throw ex; + throw CloseableUtils.closeInCatch(new RuntimeException(ex), resultSupplier); } } @@ -479,8 +478,7 @@ public Sequence processSubtotalsSpec( ); } catch (Exception ex) { - CloseQuietly.close(resultSupplierOne); - throw ex; + throw CloseableUtils.closeInCatch(new RuntimeException(ex), resultSupplierOne); } } @@ -500,7 +498,9 @@ private Sequence processSubtotalsResultAndOptionallyClose( new LazySequence<>( () -> Sequences.withBaggage( memoizedSupplier.get().results(dimsToInclude), - closeOnSequenceRead ? () -> CloseQuietly.close(memoizedSupplier.get()) : () -> {} + closeOnSequenceRead + ? () -> CloseableUtils.closeAndWrapExceptions(memoizedSupplier.get()) + : () -> {} ) ), subtotalQuery, @@ -508,8 +508,7 @@ private Sequence processSubtotalsResultAndOptionallyClose( ); } catch (Exception ex) { - CloseQuietly.close(baseResultsSupplier.get()); - throw ex; + throw CloseableUtils.closeInCatch(new RuntimeException(ex), baseResultsSupplier.get()); } } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 21550e1280fb..fd65d0f5979e 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.GenericQueryMetricsFactory; @@ -40,6 +39,7 @@ import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.utils.CloseableUtils; import java.util.List; import java.util.Map; @@ -87,7 +87,7 @@ public ScanQueryLimitRowIterator make() @Override public void cleanup(ScanQueryLimitRowIterator iterFromMake) { - CloseQuietly.close(iterFromMake); + CloseableUtils.closeAndWrapExceptions(iterFromMake); } }); }; diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java index 19ebb543578a..6ddda5eb1be8 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java @@ -24,7 +24,6 @@ import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.aggregation.BufferAggregator; @@ -43,6 +42,7 @@ import org.apache.druid.segment.historical.HistoricalCursor; import org.apache.druid.segment.historical.HistoricalDimensionSelector; import org.apache.druid.segment.historical.SingleValueHistoricalDimensionSelector; +import org.apache.druid.utils.CloseableUtils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -784,7 +784,7 @@ public void cleanup(PooledTopNParams params) if (resultsBufHolder != null) { resultsBufHolder.get().clear(); } - CloseQuietly.close(resultsBufHolder); + CloseableUtils.closeAndWrapExceptions(resultsBufHolder); } } diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java index e68a119113ff..73bf1b32e379 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.column.BaseColumn; @@ -39,6 +38,7 @@ import org.apache.druid.segment.data.IndexedIterable; import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; import org.apache.druid.segment.selector.settable.SettableLongColumnValueSelector; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -52,6 +52,7 @@ import java.util.Set; /** + * */ public class QueryableIndexIndexableAdapter implements IndexableAdapter { @@ -280,7 +281,7 @@ public boolean hasTimeAndDimsChangedSinceMark() @Override public void close() { - CloseQuietly.close(closer); + CloseableUtils.closeAndWrapExceptions(closer); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringDictionaryEncodedColumn.java b/processing/src/main/java/org/apache/druid/segment/column/StringDictionaryEncodedColumn.java index ba77d92f15ba..ec556fb4b4cd 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/StringDictionaryEncodedColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/StringDictionaryEncodedColumn.java @@ -21,7 +21,6 @@ import com.google.common.base.Predicate; import com.google.common.base.Predicates; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -40,12 +39,14 @@ import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; import org.apache.druid.segment.vector.ReadableVectorOffset; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.io.IOException; import java.util.BitSet; /** + * */ public class StringDictionaryEncodedColumn implements DictionaryEncodedColumn { @@ -490,13 +491,6 @@ public int getMaxVectorSize() @Override public void close() throws IOException { - CloseQuietly.close(cachedLookups); - - if (column != null) { - column.close(); - } - if (multiValueColumn != null) { - multiValueColumn.close(); - } + CloseableUtils.closeAll(cachedLookups, column, multiValueColumn); } } diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java index adae6d391b2f..7c6c006f68c6 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java @@ -21,7 +21,7 @@ import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.utils.CloseableUtils; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -168,7 +168,7 @@ public void get(final double[] out, final int[] indexes, final int length) protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + CloseableUtils.closeAndWrapExceptions(holder); holder = singleThreadedDoubleBuffers.get(bufferNum); // asDoubleBuffer() makes the doubleBuffer's position = 0 doubleBuffer = holder.get().asDoubleBuffer(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java index a7a8deaec1c5..4999e655a116 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java @@ -21,7 +21,7 @@ import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.utils.CloseableUtils; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -168,7 +168,7 @@ public void get(final float[] out, final int[] indexes, final int length) protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + CloseableUtils.closeAndWrapExceptions(holder); holder = singleThreadedFloatBuffers.get(bufferNum); // asFloatBuffer() makes the floatBuffer's position = 0 floatBuffer = holder.get().asFloatBuffer(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java index 808e7bedce71..c28faa160bd2 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java @@ -21,7 +21,7 @@ import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.utils.CloseableUtils; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -83,7 +83,7 @@ public long get(int index) @Override protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + CloseableUtils.closeAndWrapExceptions(holder); holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); // asLongBuffer() makes the longBuffer's position = 0 @@ -190,7 +190,7 @@ public void get(final long[] out, final int[] indexes, final int length) protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + CloseableUtils.closeAndWrapExceptions(holder); holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); currBufferNum = bufferNum; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java index d9b3cf97085e..f55fdedff951 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java @@ -24,13 +24,13 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; +import org.apache.druid.utils.CloseableUtils; import java.io.IOException; import java.nio.ByteBuffer; @@ -134,7 +134,11 @@ public static CompressedColumnarIntsSupplier fromByteBuffer(ByteBuffer buffer, B throw new IAE("Unknown version[%s]", versionFromBuffer); } - public static CompressedColumnarIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper) + public static CompressedColumnarIntsSupplier fromByteBuffer( + ByteBuffer buffer, + ByteOrder order, + SmooshedFileMapper mapper + ) { byte versionFromBuffer = buffer.get(); @@ -292,7 +296,9 @@ private class CompressedColumnarInts implements ColumnarInts int currBufferNum = -1; ResourceHolder holder; - /** buffer's position must be 0 */ + /** + * buffer's position must be 0 + */ IntBuffer buffer; @Override @@ -317,7 +323,7 @@ public int get(int index) protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + CloseableUtils.closeAndWrapExceptions(holder); holder = singleThreadedIntBuffers.get(bufferNum); // asIntBuffer() makes the buffer's position = 0 buffer = holder.get().asIntBuffer(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java index 84d8b8d7ad72..359fc1337b30 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java @@ -25,13 +25,13 @@ import org.apache.druid.collections.ResourceHolder; import org.apache.druid.common.utils.ByteUtils; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; +import org.apache.druid.utils.CloseableUtils; import java.io.IOException; import java.nio.ByteBuffer; @@ -433,7 +433,7 @@ int _get(ByteBuffer buffer, boolean bigEndian, final int bufferIndex) protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + CloseableUtils.closeAndWrapExceptions(holder); holder = singleThreadedBuffers.get(bufferNum); ByteBuffer bb = holder.get(); ByteOrder byteOrder = bb.order(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java index 35c9a3fc8ae0..ef8e5cc17d8d 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java @@ -26,7 +26,6 @@ import org.apache.druid.io.Channels; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; @@ -35,6 +34,7 @@ import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.serde.Serializer; import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.io.Closeable; @@ -530,13 +530,13 @@ private static GenericIndexed fromIterableVersionOne( headerOut.writeInt(Ints.checkedCast(valuesOut.size())); if (prevVal instanceof Closeable) { - CloseQuietly.close((Closeable) prevVal); + CloseableUtils.closeAndWrapExceptions((Closeable) prevVal); } prevVal = next; } while (objects.hasNext()); if (prevVal instanceof Closeable) { - CloseQuietly.close((Closeable) prevVal); + CloseableUtils.closeAndWrapExceptions((Closeable) prevVal); } } catch (IOException e) { diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java index bcfd109970d5..2fed9972e603 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java @@ -20,13 +20,13 @@ package org.apache.druid.segment.join; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -122,15 +122,14 @@ public Optional acquireReferences() }).orElse(true); } if (acquireFailed) { - CloseQuietly.close(closer); + CloseableUtils.closeAndWrapExceptions(closer); return Optional.empty(); } else { return Optional.of(closer); } } catch (Exception ex) { - CloseQuietly.close(closer); - return Optional.empty(); + throw CloseableUtils.closeInCatch(new RuntimeException(ex), closer); } } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 379b04a64a4b..be709391f5c7 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -39,7 +39,6 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; @@ -77,6 +76,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; import org.junit.rules.TemporaryFolder; import java.io.Closeable; @@ -593,7 +593,7 @@ public Segment apply(File segmentDir) } finally { for (Segment segment : segments) { - CloseQuietly.close(segment); + CloseableUtils.closeAndWrapExceptions(segment); } } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java index 30419b04740f..2e516cebf63a 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java @@ -28,7 +28,6 @@ import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.NoopInputRowParser; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Druids; import org.apache.druid.query.Result; @@ -41,6 +40,7 @@ import org.apache.druid.segment.Segment; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.After; @@ -133,7 +133,7 @@ public void tearDown() { if (segments != null) { for (Segment seg : segments) { - CloseQuietly.close(seg); + CloseableUtils.closeAndWrapExceptions(seg); } } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java index 43ea5eb08637..52dca67257ca 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java @@ -26,7 +26,6 @@ import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -35,6 +34,7 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -226,7 +226,7 @@ private void checkSerializedSizeAndData(int chunkFactor) throws Exception for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); } private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception @@ -269,7 +269,7 @@ private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); mapper.close(); } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java index a857eb4256c0..01c9cc26dca6 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java @@ -22,9 +22,9 @@ import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.CompressedPools; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -58,7 +58,7 @@ public CompressedColumnarIntsSupplierTest(CompressionStrategy compressionStrateg public void setUp() { closer = Closer.create(); - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); columnarInts = null; supplier = null; vals = null; @@ -68,12 +68,12 @@ public void setUp() public void tearDown() throws Exception { closer.close(); - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); } private void setupSimple(final int chunkSize) { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; @@ -97,7 +97,7 @@ private void setupSimpleWithSerde(final int chunkSize) throws IOException private void makeWithSerde(final int chunkSize) throws IOException { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedColumnarIntsSupplier theSupplier = CompressedColumnarIntsSupplier.fromIntBuffer( @@ -271,7 +271,7 @@ public void run() stopLatch.await(); } finally { - CloseQuietly.close(columnarInts2); + CloseableUtils.closeAndWrapExceptions(columnarInts2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java index 284aea33b758..338897be752d 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java @@ -23,10 +23,10 @@ import com.google.common.primitives.Doubles; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -321,7 +321,7 @@ public void run() stopLatch.await(); } finally { - CloseQuietly.close(indexed2); + CloseableUtils.closeAndWrapExceptions(indexed2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java index d11c089abf25..c831170f48f7 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java @@ -23,10 +23,10 @@ import com.google.common.primitives.Floats; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -78,8 +78,29 @@ public static Iterable compressionStrategies() private final float[] values2 = {13.2f, 6.1f, 0.001f, 123f, 12572f, 123.1f, 784.4f, 6892.8634f, 8.341111f}; private final float[] values3 = {0.001f, 0.001f, 0.001f, 0.001f, 0.001f, 100f, 100f, 100f, 100f, 100f}; private final float[] values4 = {0f, 0f, 0f, 0f, 0.01f, 0f, 0f, 0f, 21.22f, 0f, 0f, 0f, 0f, 0f, 0f}; - private final float[] values5 = {123.16f, 1.12f, 62.00f, 462.12f, 517.71f, 56.54f, 971.32f, 824.22f, 472.12f, 625.26f}; - private final float[] values6 = {1000000f, 1000001f, 1000002f, 1000003f, 1000004f, 1000005f, 1000006f, 1000007f, 1000008f}; + private final float[] values5 = { + 123.16f, + 1.12f, + 62.00f, + 462.12f, + 517.71f, + 56.54f, + 971.32f, + 824.22f, + 472.12f, + 625.26f + }; + private final float[] values6 = { + 1000000f, + 1000001f, + 1000002f, + 1000003f, + 1000004f, + 1000005f, + 1000006f, + 1000007f, + 1000008f + }; private final float[] values7 = { Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, 12378.5734f, -12718243.7496f, -93653653.1f, 12743153.385534f, 21431.414538f, 65487435436632.123f, -43734526234564.65f @@ -321,7 +342,7 @@ public void run() stopLatch.await(); } finally { - CloseQuietly.close(indexed2); + CloseableUtils.closeAndWrapExceptions(indexed2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java index 675c49420cb0..dd428f9f458d 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java @@ -23,10 +23,10 @@ import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -350,7 +350,7 @@ public void run() stopLatch.await(); } finally { - CloseQuietly.close(indexed2); + CloseableUtils.closeAndWrapExceptions(indexed2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java index 5ba842013bba..bb5868f6df2f 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java @@ -25,7 +25,6 @@ import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.commons.io.IOUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -34,6 +33,7 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -154,7 +154,7 @@ private void checkSerializedSizeAndData(int chunkSize) throws Exception for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); } @Test @@ -268,8 +268,7 @@ private void checkV2SerializedSizeAndData(int chunkSize) throws Exception for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); - mapper.close(); + CloseableUtils.closeAll(columnarInts, mapper); } @Test diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java index f113dc3510ad..48a443ef6610 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java @@ -27,9 +27,9 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.CompressedPools; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -67,7 +67,7 @@ public static Iterable compressionStrategies() ); } - private static final int[] MAX_VALUES = new int[] {0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; + private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; public CompressedVSizeColumnarIntsSupplierTest(CompressionStrategy compressionStrategy, ByteOrder byteOrder) { @@ -86,7 +86,7 @@ public CompressedVSizeColumnarIntsSupplierTest(CompressionStrategy compressionSt public void setUp() { closer = Closer.create(); - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); columnarInts = null; supplier = null; vals = null; @@ -95,13 +95,12 @@ public void setUp() @After public void tearDown() throws Exception { - CloseQuietly.close(columnarInts); - closer.close(); + CloseableUtils.closeAll(columnarInts, closer); } private void setupSimple(final int chunkSize) { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; @@ -126,7 +125,7 @@ private void setupSimpleWithSerde(final int chunkSize) throws IOException private void makeWithSerde(final int chunkSize) throws IOException { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedVSizeColumnarIntsSupplier theSupplier = CompressedVSizeColumnarIntsSupplier.fromList( @@ -212,8 +211,14 @@ public void testChunkTooBig() throws Exception public void testmaxIntsInBuffer() { Assert.assertEquals(CompressedPools.BUFFER_SIZE, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(1)); - Assert.assertEquals(CompressedPools.BUFFER_SIZE / 2, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(2)); - Assert.assertEquals(CompressedPools.BUFFER_SIZE / 4, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(4)); + Assert.assertEquals( + CompressedPools.BUFFER_SIZE / 2, + CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(2) + ); + Assert.assertEquals( + CompressedPools.BUFFER_SIZE / 4, + CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(4) + ); Assert.assertEquals(CompressedPools.BUFFER_SIZE, 0x10000); // nearest power of 2 is 2^14 Assert.assertEquals(1 << 14, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(3)); @@ -330,7 +335,7 @@ public void run() stopLatch.await(); } finally { - CloseQuietly.close(columnarInts2); + CloseableUtils.closeAndWrapExceptions(columnarInts2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java index de6485fff1f7..152c7b1c7d83 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java @@ -25,7 +25,6 @@ import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -34,6 +33,7 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -322,7 +322,7 @@ private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkF Assert.assertEquals(subVals.get(j), vals.get(i)[j]); } } - CloseQuietly.close(columnarMultiInts); + CloseableUtils.closeAndWrapExceptions(columnarMultiInts); mapper.close(); } } @@ -395,7 +395,7 @@ private void generateV2SerializedSizeAndData(long numRows, int maxValue, int max Assert.assertEquals(subVals.get(j), expected[j]); } } - CloseQuietly.close(columnarMultiInts); + CloseableUtils.closeAndWrapExceptions(columnarMultiInts); mapper.close(); } } diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 651ae6350492..8d25619d98fe 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; @@ -56,6 +55,7 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.server.QueryResource; +import org.apache.druid.utils.CloseableUtils; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -517,7 +517,7 @@ public JsonParserIterator make() @Override public void cleanup(JsonParserIterator iterFromMake) { - CloseQuietly.close(iterFromMake); + CloseableUtils.closeAndWrapExceptions(iterFromMake); } } ); diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java index 9478f246764f..7c0b8644c519 100644 --- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java +++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java @@ -27,12 +27,12 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import javax.servlet.http.HttpServletResponse; @@ -98,7 +98,7 @@ public boolean hasNext() return false; } if (jp.getCurrentToken() == JsonToken.END_ARRAY) { - CloseQuietly.close(jp); + CloseableUtils.closeAndWrapExceptions(jp); return false; } diff --git a/server/src/main/java/org/apache/druid/client/cache/HybridCache.java b/server/src/main/java/org/apache/druid/client/cache/HybridCache.java index 65bf957a27ed..509b19aa6538 100644 --- a/server/src/main/java/org/apache/druid/client/cache/HybridCache.java +++ b/server/src/main/java/org/apache/druid/client/cache/HybridCache.java @@ -156,7 +156,7 @@ public void close(String namespace) @LifecycleStop public void close() throws IOException { - CloseableUtils.closeBoth(level1, level2); + CloseableUtils.closeAll(level1, level2); } @Override diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index 533389a02c3e..6fc255738053 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -31,15 +31,16 @@ import org.apache.druid.curator.cache.PathChildrenCacheFactory; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -135,12 +136,16 @@ public void stop() started = false; + Closer closer = Closer.create(); for (PathChildrenCache cache : listeners.values()) { closer.register(cache); } try { - CloseQuietly.close(closer); + closer.close(); + } + catch (IOException e) { + throw new RuntimeException(e); } finally { pathChildrenCacheExecutor.shutdown(); @@ -414,8 +419,7 @@ private void startCache(PathChildrenCache cache) cache.start(); } catch (Exception e) { - CloseQuietly.close(cache); - throw new RuntimeException(e); + throw CloseableUtils.closeInCatch(new RuntimeException(e), cache); } } diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index 10b2a6c8076f..d9270d911b97 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -30,9 +30,9 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.DruidNode; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.util.concurrent.ExecutorService; @@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference; /** + * */ public class CuratorDruidLeaderSelector implements DruidLeaderSelector { @@ -100,8 +101,11 @@ public void isLeader() log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); // give others a chance to become leader. - final LeaderLatch oldLatch = createNewLeaderLatchWithListener(); - CloseQuietly.close(oldLatch); + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); + leader = false; try { //Small delay before starting the latch so that others waiting are chosen to become leader. @@ -202,7 +206,8 @@ public void unregisterListener() if (!lifecycleLock.canStop()) { throw new ISE("can't stop."); } - CloseQuietly.close(leaderLatch.get()); + + CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); listenerExecutor.shutdownNow(); } } diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index 383ec204df31..10bd304af38f 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -161,7 +161,7 @@ public void stop() throws IOException closer.registerAll(nodeRoleWatchers.values()); closer.registerAll(nodeDiscoverers); - CloseableUtils.closeBoth(closer, listenerExecutor::shutdownNow); + CloseableUtils.closeAll(closer, listenerExecutor::shutdownNow); } private static class NodeRoleWatcher implements DruidNodeDiscovery, Closeable @@ -228,7 +228,7 @@ private static class NodeRoleWatcher implements DruidNodeDiscovery, Closeable @Override public void close() throws IOException { - CloseableUtils.closeBoth(cache, cacheExecutor::shutdownNow); + CloseableUtils.closeAll(cache, cacheExecutor::shutdownNow); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 887ad6d2b2aa..c1d062a5a15b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -30,7 +30,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -64,6 +63,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; import java.io.Closeable; @@ -250,8 +250,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner); } catch (RuntimeException e) { - CloseQuietly.close(segmentAndCloseable.rhs); - throw e; + throw CloseableUtils.closeInCatch(e, segmentAndCloseable.rhs); } } ) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index 52e805a2361f..88d6d6ede228 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -119,7 +119,7 @@ public synchronized void close() throws IOException { if (!closed) { closed = true; - CloseableUtils.closeBoth(firehose, shutdownExec::shutdownNow); + CloseableUtils.closeAll(firehose, shutdownExec::shutdownNow); } } } diff --git a/server/src/main/java/org/apache/druid/server/log/FileRequestLogger.java b/server/src/main/java/org/apache/druid/server/log/FileRequestLogger.java index 0cab5d351552..fac2a7833a09 100644 --- a/server/src/main/java/org/apache/druid/server/log/FileRequestLogger.java +++ b/server/src/main/java/org/apache/druid/server/log/FileRequestLogger.java @@ -22,10 +22,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.RequestLogLine; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.MutableDateTime; @@ -43,9 +44,12 @@ import java.util.concurrent.ScheduledExecutorService; /** + * */ public class FileRequestLogger implements RequestLogger { + private static final Logger log = new Logger(FileRequestLogger.class); + private final ObjectMapper objectMapper; private final ScheduledExecutorService exec; private final File baseDir; @@ -93,7 +97,15 @@ public ScheduledExecutors.Signal call() try { synchronized (lock) { currentDay = currentDay.plusDays(1); - CloseQuietly.close(fileWriter); + + CloseableUtils.closeAndSuppressExceptions( + fileWriter, + e -> log.warn( + "Could not close log file for %s. Creating new log file anyway.", + currentDay + ) + ); + fileWriter = getFileWriter(); } } @@ -124,7 +136,7 @@ private OutputStreamWriter getFileWriter() throws FileNotFoundException public void stop() { synchronized (lock) { - CloseQuietly.close(fileWriter); + CloseableUtils.closeAndWrapExceptions(fileWriter); } } From 8dad2b70aa27d618a276b89160dd9f36ffa1f3dd Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 6 Aug 2020 18:36:48 -0700 Subject: [PATCH 02/11] Remove unused import. --- .../common/task/AppenderatorDriverRealtimeIndexTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 6b9e222abc1f..fca09a674c97 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -65,7 +65,6 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.NoopQueryRunner; From 69f3e8483f75cf6ceccf6bf118a1d6c4ac1a50c0 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 7 Aug 2020 00:22:05 -0700 Subject: [PATCH 03/11] Fix up various issues. --- .../apache/druid/utils/CloseableUtils.java | 21 ++++ .../druid/utils/CloseableUtilsTest.java | 110 +++++++++++++++++- .../groupby/strategy/GroupByStrategyV2.java | 12 +- .../druid/segment/join/HashJoinSegment.java | 4 +- .../druid/curator/announcement/Announcer.java | 4 +- .../appenderator/SinkQuerySegmentWalker.java | 4 +- 6 files changed, 139 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java index 72c2c8dbfabe..bb326dd31ed9 100644 --- a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java @@ -93,6 +93,27 @@ public static RuntimeException closeInCatch( throw caught; } + /** + * Like {@link #closeInCatch} but wraps {@param caught} in a {@link RuntimeException} if it is a checked exception. + */ + public static RuntimeException closeAndWrapInCatch( + final E caught, + @Nullable final Closeable closeable + ) + { + try { + throw closeInCatch(caught, closeable); + } + catch (RuntimeException | Error e) { + // Unchecked exception. + throw e; + } + catch (Throwable e) { + // Checked exception; must wrap. + throw new RuntimeException(e); + } + } + /** * Like {@link Closeable#close()} but wraps IOExceptions in RuntimeExceptions. */ diff --git a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java index 858b06e63dd8..1b42b3481bdf 100644 --- a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java +++ b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java @@ -23,6 +23,7 @@ import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; +import org.junit.internal.matchers.ThrowableCauseMatcher; import org.junit.internal.matchers.ThrowableMessageMatcher; import javax.annotation.Nullable; @@ -147,7 +148,7 @@ public void test_closeAndSuppressExceptions_runtimeException() } @Test - public void test_closeInCatch_improper() throws Exception + public void test_closeInCatch_improper() { Exception e = null; try { @@ -168,7 +169,7 @@ public void test_closeInCatch_improper() throws Exception } @Test - public void test_closeInCatch_quiet() throws Exception + public void test_closeInCatch_quiet() { Exception e = null; try { @@ -194,7 +195,7 @@ public void test_closeInCatch_ioException() Exception e = null; try { //noinspection ThrowableNotThrown - CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), ioExceptionCloseable); + CloseableUtils.closeInCatch(new IOException("this one was caught"), ioExceptionCloseable); } catch (Exception e1) { e = e1; @@ -203,7 +204,7 @@ public void test_closeInCatch_ioException() Assert.assertTrue(ioExceptionCloseable.isClosed()); // First exception - Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class)); Assert.assertThat( e, ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) @@ -240,6 +241,107 @@ public void test_closeInCatch_runtimeException() Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); } + @Test + public void test_closeAndWrapInCatch_improper() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(null, quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Must be called with non-null caught exception")) + ); + } + + @Test + public void test_closeAndWrapInCatch_quiet() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + } + + @Test + public void test_closeAndWrapInCatch_ioException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new IOException("this one was caught"), ioExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(ioExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("java.io.IOException: this one was caught")) + ); + Assert.assertThat(e, ThrowableCauseMatcher.hasCause(CoreMatchers.instanceOf(IOException.class))); + Assert.assertThat( + e, + ThrowableCauseMatcher.hasCause( + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ) + ); + + // Second exception + Assert.assertEquals(1, e.getCause().getSuppressed().length); + Assert.assertThat(e.getCause().getSuppressed()[0], CoreMatchers.instanceOf(IOException.class)); + } + + @Test + public void test_closeAndWrapInCatch_runtimeException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), runtimeExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(runtimeExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + private static void assertClosed(final TestCloseable... closeables) { for (TestCloseable closeable : closeables) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 9e0372fe2b2b..3f33acf7d34b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -329,8 +329,8 @@ public Sequence processSubqueryResult( finalResultSupplier ); } - catch (Exception ex) { - throw CloseableUtils.closeInCatch(new RuntimeException(ex), resultSupplier); + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, resultSupplier); } } @@ -477,8 +477,8 @@ public Sequence processSubtotalsSpec( resultSupplierOne //this will close resources allocated by resultSupplierOne after sequence read ); } - catch (Exception ex) { - throw CloseableUtils.closeInCatch(new RuntimeException(ex), resultSupplierOne); + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, resultSupplierOne); } } @@ -507,8 +507,8 @@ private Sequence processSubtotalsResultAndOptionallyClose( null ); } - catch (Exception ex) { - throw CloseableUtils.closeInCatch(new RuntimeException(ex), baseResultsSupplier.get()); + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, baseResultsSupplier.get()); } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java index 2fed9972e603..2224297782be 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java @@ -128,8 +128,8 @@ public Optional acquireReferences() return Optional.of(closer); } } - catch (Exception ex) { - throw CloseableUtils.closeInCatch(new RuntimeException(ex), closer); + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, closer); } } } diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index 6fc255738053..ed3c393373c9 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -418,8 +418,8 @@ private void startCache(PathChildrenCache cache) try { cache.start(); } - catch (Exception e) { - throw CloseableUtils.closeInCatch(new RuntimeException(e), cache); + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, cache); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index c1d062a5a15b..d7023a7084d1 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -249,8 +249,8 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final ); return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner); } - catch (RuntimeException e) { - throw CloseableUtils.closeInCatch(e, segmentAndCloseable.rhs); + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, segmentAndCloseable.rhs); } } ) From d9f3820a789d8922a3090de254f658864836f7ff Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 13 Aug 2020 13:29:35 -0700 Subject: [PATCH 04/11] Adjustments to tests. --- .../apache/druid/utils/CloseableUtils.java | 17 +++++++- .../druid/utils/CloseableUtilsTest.java | 40 ++++++++++++++++++- .../druid/curator/announcement/Announcer.java | 8 +--- .../druid/server/log/FileRequestLogger.java | 5 +-- 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java index bb326dd31ed9..6cd1320d00e5 100644 --- a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java @@ -19,13 +19,16 @@ package org.apache.druid.utils; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.java.util.common.io.Closer; import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.function.Consumer; /** @@ -44,12 +47,22 @@ public final class CloseableUtils * of creating a Closer and registering objects in it. */ public static void closeAll(Closeable first, Closeable... others) throws IOException + { + final List closeables = new ArrayList<>(others.length + 1); + closeables.add(first); + closeables.addAll(Arrays.asList(others)); + closeAll(closeables); + } + + /** + * Close all the provided {@param closeables}, from first to last. + */ + public static void closeAll(Iterable closeables) throws IOException { final Closer closer = Closer.create(); // Register in reverse order, so we close from first to last. - closer.registerAll(Lists.reverse(Arrays.asList(others))); - closer.register(first); + closer.registerAll(Lists.reverse(ImmutableList.copyOf(closeables))); closer.close(); } diff --git a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java index 1b42b3481bdf..b7650afc915b 100644 --- a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java +++ b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.druid.utils; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; @@ -45,14 +46,21 @@ public class CloseableUtilsTest private final Consumer chomper = e -> chomped.incrementAndGet(); @Test - public void test_closeAll_quiet() throws IOException + public void test_closeAll_array_quiet() throws IOException { CloseableUtils.closeAll(quietCloseable, quietCloseable2); assertClosed(quietCloseable, quietCloseable2); } @Test - public void test_closeAll_loud() + public void test_closeAll_list_quiet() throws IOException + { + CloseableUtils.closeAll(ImmutableList.of(quietCloseable, quietCloseable2)); + assertClosed(quietCloseable, quietCloseable2); + } + + @Test + public void test_closeAll_array_loud() { Exception e = null; try { @@ -72,6 +80,34 @@ public void test_closeAll_loud() Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(RuntimeException.class)); } + @Test + public void test_closeAll_list_loud() + { + Exception e = null; + try { + CloseableUtils.closeAll( + ImmutableList.of( + quietCloseable, + ioExceptionCloseable, + quietCloseable2, + runtimeExceptionCloseable + ) + ); + } + catch (Exception e2) { + e = e2; + } + + assertClosed(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class)); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(RuntimeException.class)); + } + @Test public void test_closeAndWrapExceptions_null() { diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index ed3c393373c9..b65d85644d40 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -31,7 +31,6 @@ import org.apache.druid.curator.cache.PathChildrenCacheFactory; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; @@ -136,13 +135,8 @@ public void stop() started = false; - - Closer closer = Closer.create(); - for (PathChildrenCache cache : listeners.values()) { - closer.register(cache); - } try { - closer.close(); + CloseableUtils.closeAll(listeners.values()); } catch (IOException e) { throw new RuntimeException(e); diff --git a/server/src/main/java/org/apache/druid/server/log/FileRequestLogger.java b/server/src/main/java/org/apache/druid/server/log/FileRequestLogger.java index fac2a7833a09..e8c136112c58 100644 --- a/server/src/main/java/org/apache/druid/server/log/FileRequestLogger.java +++ b/server/src/main/java/org/apache/druid/server/log/FileRequestLogger.java @@ -100,10 +100,7 @@ public ScheduledExecutors.Signal call() CloseableUtils.closeAndSuppressExceptions( fileWriter, - e -> log.warn( - "Could not close log file for %s. Creating new log file anyway.", - currentDay - ) + e -> log.warn("Could not close log file for %s. Creating new log file anyway.", currentDay) ); fileWriter = getFileWriter(); From 13b10f1a364c113cb56339f3bfd9d88b3b8658af Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 13 Aug 2020 17:01:00 -0700 Subject: [PATCH 05/11] Fix null handling. --- .../java/org/apache/druid/utils/CloseableUtils.java | 3 +-- .../org/apache/druid/utils/CloseableUtilsTest.java | 11 ++++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java index 6cd1320d00e5..0dd1b9270c7b 100644 --- a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java @@ -19,7 +19,6 @@ package org.apache.druid.utils; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.java.util.common.io.Closer; @@ -62,7 +61,7 @@ public static void closeAll(Iterable closeables) throws final Closer closer = Closer.create(); // Register in reverse order, so we close from first to last. - closer.registerAll(Lists.reverse(ImmutableList.copyOf(closeables))); + closer.registerAll(Lists.reverse(Lists.newArrayList(closeables))); closer.close(); } diff --git a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java index b7650afc915b..fd7e36a159e3 100644 --- a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java +++ b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java @@ -20,7 +20,6 @@ package org.apache.druid.utils; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; @@ -30,6 +29,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -48,14 +48,14 @@ public class CloseableUtilsTest @Test public void test_closeAll_array_quiet() throws IOException { - CloseableUtils.closeAll(quietCloseable, quietCloseable2); + CloseableUtils.closeAll(quietCloseable, null, quietCloseable2); assertClosed(quietCloseable, quietCloseable2); } @Test public void test_closeAll_list_quiet() throws IOException { - CloseableUtils.closeAll(ImmutableList.of(quietCloseable, quietCloseable2)); + CloseableUtils.closeAll(Arrays.asList(quietCloseable, null, quietCloseable2)); assertClosed(quietCloseable, quietCloseable2); } @@ -64,7 +64,7 @@ public void test_closeAll_array_loud() { Exception e = null; try { - CloseableUtils.closeAll(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + CloseableUtils.closeAll(quietCloseable, null, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); } catch (Exception e2) { e = e2; @@ -86,8 +86,9 @@ public void test_closeAll_list_loud() Exception e = null; try { CloseableUtils.closeAll( - ImmutableList.of( + Arrays.asList( quietCloseable, + null, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable From 1fa8c4b7e784ed4cfa3537312280f8f9d4881147 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 25 Aug 2020 02:21:29 -0700 Subject: [PATCH 06/11] Additional test. --- .../apache/druid/client/cache/HybridCacheTest.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java b/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java index b5d96ebe2c22..aff62b555718 100644 --- a/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java +++ b/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java @@ -35,6 +35,7 @@ import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.HashSet; import java.util.Map; @@ -82,10 +83,12 @@ public void configure(Binder binder) } @Test - public void testSanity() + public void testSanity() throws IOException { - final MapCache l1 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); - final MapCache l2 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); + final ByteCountingLRUMap l1Map = new ByteCountingLRUMap(1024 * 1024); + final ByteCountingLRUMap l2Map = new ByteCountingLRUMap(1024 * 1024); + final MapCache l1 = new MapCache(l1Map); + final MapCache l2 = new MapCache(l2Map); HybridCache cache = new HybridCache(new HybridCacheConfig(), l1, l2); final Cache.NamedKey key1 = new Cache.NamedKey("a", HI); @@ -175,5 +178,10 @@ public void testSanity() Assert.assertEquals(hits + 1, cache.getStats().getNumHits()); Assert.assertEquals(misses + 1, cache.getStats().getNumMisses()); } + + // test close + cache.close(); + Assert.assertEquals("l1 size after close()", 0, l1Map.size()); + Assert.assertEquals("l2 size after close()", 0, l2Map.size()); } } From 45e7fb17e01e89fc098dc06ae280d25c1ee2526e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 7 Jan 2021 22:55:51 -0800 Subject: [PATCH 07/11] Adjustments from review. --- .../guava/ParallelMergeCombiningSequence.java | 2 +- .../org/apache/druid/utils/CloseableUtilsTest.java | 4 ++-- .../data/BlockLayoutColumnarDoublesSupplier.java | 2 +- .../data/BlockLayoutColumnarFloatsSupplier.java | 2 +- .../data/BlockLayoutColumnarLongsSupplier.java | 4 ++-- .../data/CompressedColumnarIntsSupplier.java | 2 +- .../data/CompressedVSizeColumnarIntsSupplier.java | 2 +- .../apache/druid/segment/join/HashJoinSegment.java | 8 +++++++- ...ressedVSizeColumnarMultiIntsSerializerTest.java | 14 +++++++++----- 9 files changed, 25 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 5423e4712176..44a9a9ad62e4 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -1383,6 +1383,6 @@ private static void closeAllCursors(final Collection { Closer closer = Closer.create(); closer.registerAll(cursors); - CloseableUtils.closeAndWrapExceptions(closer); + CloseableUtils.closeAndSuppressExceptions(closer, e -> LOG.warn(e, "Failed to close result cursors")); } } diff --git a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java index fd7e36a159e3..31dd011d6534 100644 --- a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java +++ b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java @@ -77,7 +77,7 @@ public void test_closeAll_array_loud() // Second exception Assert.assertEquals(1, e.getSuppressed().length); - Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); } @Test @@ -106,7 +106,7 @@ public void test_closeAll_list_loud() // Second exception Assert.assertEquals(1, e.getSuppressed().length); - Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); } @Test diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java index 7c6c006f68c6..5859f07cb8c5 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java @@ -168,7 +168,7 @@ public void get(final double[] out, final int[] indexes, final int length) protected void loadBuffer(int bufferNum) { - CloseableUtils.closeAndWrapExceptions(holder); + holder.close(); holder = singleThreadedDoubleBuffers.get(bufferNum); // asDoubleBuffer() makes the doubleBuffer's position = 0 doubleBuffer = holder.get().asDoubleBuffer(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java index 4999e655a116..5876b3a87472 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java @@ -168,7 +168,7 @@ public void get(final float[] out, final int[] indexes, final int length) protected void loadBuffer(int bufferNum) { - CloseableUtils.closeAndWrapExceptions(holder); + holder.close(); holder = singleThreadedFloatBuffers.get(bufferNum); // asFloatBuffer() makes the floatBuffer's position = 0 floatBuffer = holder.get().asFloatBuffer(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java index c28faa160bd2..efcf76f27444 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java @@ -83,7 +83,7 @@ public long get(int index) @Override protected void loadBuffer(int bufferNum) { - CloseableUtils.closeAndWrapExceptions(holder); + holder.close(); holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); // asLongBuffer() makes the longBuffer's position = 0 @@ -190,7 +190,7 @@ public void get(final long[] out, final int[] indexes, final int length) protected void loadBuffer(int bufferNum) { - CloseableUtils.closeAndWrapExceptions(holder); + holder.close(); holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); currBufferNum = bufferNum; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java index f55fdedff951..9278b19b3e23 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java @@ -323,7 +323,7 @@ public int get(int index) protected void loadBuffer(int bufferNum) { - CloseableUtils.closeAndWrapExceptions(holder); + holder.close(); holder = singleThreadedIntBuffers.get(bufferNum); // asIntBuffer() makes the buffer's position = 0 buffer = holder.get().asIntBuffer(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java index 359fc1337b30..606f91806e2f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java @@ -433,7 +433,7 @@ int _get(ByteBuffer buffer, boolean bigEndian, final int bufferIndex) protected void loadBuffer(int bufferNum) { - CloseableUtils.closeAndWrapExceptions(holder); + holder.close(); holder = singleThreadedBuffers.get(bufferNum); ByteBuffer bb = holder.get(); ByteOrder byteOrder = bb.order(); diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java index 2224297782be..89ccdd9ed5b9 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java @@ -21,6 +21,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; @@ -42,6 +43,8 @@ */ public class HashJoinSegment implements SegmentReference { + private static final Logger log = new Logger(HashJoinSegment.class); + private final SegmentReference baseSegment; private final List clauses; private final JoinFilterPreAnalysis joinFilterPreAnalysis; @@ -129,7 +132,10 @@ public Optional acquireReferences() } } catch (Throwable e) { - throw CloseableUtils.closeAndWrapInCatch(e, closer); + // acquireReferences is not permitted to throw exceptions. + CloseableUtils.closeAndSuppressExceptions(closer, e::addSuppressed); + log.warn(e, "Exception encountered while trying to acquire reference"); + return Optional.empty(); } } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java index 152c7b1c7d83..6877416e764b 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java @@ -322,12 +322,17 @@ private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkF Assert.assertEquals(subVals.get(j), vals.get(i)[j]); } } - CloseableUtils.closeAndWrapExceptions(columnarMultiInts); - mapper.close(); + CloseableUtils.closeAll(columnarMultiInts, mapper); } } - private void generateV2SerializedSizeAndData(long numRows, int maxValue, int maxValuesPerRow, int offsetChunkFactor, int valueChunkFactor) throws Exception + private void generateV2SerializedSizeAndData( + long numRows, + int maxValue, + int maxValuesPerRow, + int offsetChunkFactor, + int valueChunkFactor + ) throws Exception { File tmpDirectory = FileUtils.createTempDir(StringUtils.format( "CompressedVSizeIndexedV3WriterTest_%d_%d", @@ -395,8 +400,7 @@ private void generateV2SerializedSizeAndData(long numRows, int maxValue, int max Assert.assertEquals(subVals.get(j), expected[j]); } } - CloseableUtils.closeAndWrapExceptions(columnarMultiInts); - mapper.close(); + CloseableUtils.closeAll(columnarMultiInts, mapper); } } } From 0272ab57f48281a3443a175f688eaf9f483f3b5c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 13 Jan 2021 17:43:48 -0800 Subject: [PATCH 08/11] Fixup style stuff. --- .../druid/segment/column/StringDictionaryEncodedColumn.java | 2 +- .../druid/segment/data/BlockLayoutColumnarDoublesSupplier.java | 1 - .../druid/segment/data/BlockLayoutColumnarFloatsSupplier.java | 1 - .../druid/segment/data/BlockLayoutColumnarLongsSupplier.java | 1 - .../druid/segment/data/CompressedColumnarIntsSupplier.java | 1 - .../druid/segment/data/CompressedVSizeColumnarIntsSupplier.java | 1 - 6 files changed, 1 insertion(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringDictionaryEncodedColumn.java b/processing/src/main/java/org/apache/druid/segment/column/StringDictionaryEncodedColumn.java index e52d985ae5aa..603b52fcbe76 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/StringDictionaryEncodedColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/StringDictionaryEncodedColumn.java @@ -40,8 +40,8 @@ import org.apache.druid.segment.vector.ReadableVectorInspector; import org.apache.druid.segment.vector.ReadableVectorOffset; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; -import org.apache.druid.utils.CloseableUtils; import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.io.IOException; diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java index 5859f07cb8c5..5b43b5a67087 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java @@ -21,7 +21,6 @@ import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.utils.CloseableUtils; import java.nio.ByteBuffer; import java.nio.ByteOrder; diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java index 5876b3a87472..1b6ec1a5022f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java @@ -21,7 +21,6 @@ import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.utils.CloseableUtils; import java.nio.ByteBuffer; import java.nio.ByteOrder; diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java index efcf76f27444..a7ba0619c270 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java @@ -21,7 +21,6 @@ import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.utils.CloseableUtils; import java.nio.ByteBuffer; import java.nio.ByteOrder; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java index 9278b19b3e23..918810e9bbe7 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java @@ -30,7 +30,6 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; -import org.apache.druid.utils.CloseableUtils; import java.io.IOException; import java.nio.ByteBuffer; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java index 606f91806e2f..d25b2f878b06 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java @@ -31,7 +31,6 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; -import org.apache.druid.utils.CloseableUtils; import java.io.IOException; import java.nio.ByteBuffer; From 113d62216223e1f5252aac430ba1cfad89bd70da Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 14 Jan 2021 12:22:57 -0800 Subject: [PATCH 09/11] Fix NPE caused by holder starting out null. --- .../segment/data/BlockLayoutColumnarDoublesSupplier.java | 4 +++- .../segment/data/BlockLayoutColumnarFloatsSupplier.java | 4 +++- .../segment/data/BlockLayoutColumnarLongsSupplier.java | 8 ++++++-- .../segment/data/CompressedColumnarIntsSupplier.java | 4 +++- .../segment/data/CompressedVSizeColumnarIntsSupplier.java | 4 +++- 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java index 5b43b5a67087..28b3c5d2b6c3 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java @@ -167,7 +167,9 @@ public void get(final double[] out, final int[] indexes, final int length) protected void loadBuffer(int bufferNum) { - holder.close(); + if (holder != null) { + holder.close(); + } holder = singleThreadedDoubleBuffers.get(bufferNum); // asDoubleBuffer() makes the doubleBuffer's position = 0 doubleBuffer = holder.get().asDoubleBuffer(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java index 1b6ec1a5022f..c11ba18ec3df 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java @@ -167,7 +167,9 @@ public void get(final float[] out, final int[] indexes, final int length) protected void loadBuffer(int bufferNum) { - holder.close(); + if (holder != null) { + holder.close(); + } holder = singleThreadedFloatBuffers.get(bufferNum); // asFloatBuffer() makes the floatBuffer's position = 0 floatBuffer = holder.get().asFloatBuffer(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java index a7ba0619c270..05473320553f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java @@ -82,7 +82,9 @@ public long get(int index) @Override protected void loadBuffer(int bufferNum) { - holder.close(); + if (holder != null) { + holder.close(); + } holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); // asLongBuffer() makes the longBuffer's position = 0 @@ -189,7 +191,9 @@ public void get(final long[] out, final int[] indexes, final int length) protected void loadBuffer(int bufferNum) { - holder.close(); + if (holder != null) { + holder.close(); + } holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); currBufferNum = bufferNum; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java index 918810e9bbe7..e685721eb88e 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java @@ -322,7 +322,9 @@ public int get(int index) protected void loadBuffer(int bufferNum) { - holder.close(); + if (holder != null) { + holder.close(); + } holder = singleThreadedIntBuffers.get(bufferNum); // asIntBuffer() makes the buffer's position = 0 buffer = holder.get().asIntBuffer(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java index d25b2f878b06..e63d6702f8ba 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java @@ -432,7 +432,9 @@ int _get(ByteBuffer buffer, boolean bigEndian, final int bufferIndex) protected void loadBuffer(int bufferNum) { - holder.close(); + if (holder != null) { + holder.close(); + } holder = singleThreadedBuffers.get(bufferNum); ByteBuffer bb = holder.get(); ByteOrder byteOrder = bb.order(); From c81478b7cce0ed8cac373fc6012b6fb5af45c2b9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 22 Oct 2021 13:13:07 -0700 Subject: [PATCH 10/11] Fix spelling. --- core/src/main/java/org/apache/druid/utils/CloseableUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java index 0dd1b9270c7b..140105823490 100644 --- a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java @@ -147,7 +147,7 @@ public static void closeAndWrapExceptions(@Nullable final Closeable closeable) * Like {@link Closeable#close()} but sends any exceptions to the provided Consumer, and then throws them away. * * If the Consumer throws an exception, that exception is thrown by this method. So if your intent is to chomp - * exceptions, you should avoid writing a Consumer that might thrown an exception. + * exceptions, you should avoid writing a Consumer that might throw an exception. * * Throwables that are not Exceptions are thrown rather than sent to the Consumer. */ From 762a88f4ff1e05fbef1f69597d7c9e734743c1b4 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 23 Oct 2021 08:58:01 -0700 Subject: [PATCH 11/11] Chomp Throwables too. --- .../apache/druid/utils/CloseableUtils.java | 8 +-- .../druid/utils/CloseableUtilsTest.java | 56 ++++++++++++++++++- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java index 140105823490..6a0a143b13e0 100644 --- a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java @@ -144,16 +144,14 @@ public static void closeAndWrapExceptions(@Nullable final Closeable closeable) } /** - * Like {@link Closeable#close()} but sends any exceptions to the provided Consumer, and then throws them away. + * Like {@link Closeable#close()} but sends any exceptions to the provided Consumer and then returns quietly. * * If the Consumer throws an exception, that exception is thrown by this method. So if your intent is to chomp * exceptions, you should avoid writing a Consumer that might throw an exception. - * - * Throwables that are not Exceptions are thrown rather than sent to the Consumer. */ public static void closeAndSuppressExceptions( @Nullable final Closeable closeable, - final Consumer chomper + final Consumer chomper ) { if (closeable == null) { @@ -163,7 +161,7 @@ public static void closeAndSuppressExceptions( try { closeable.close(); } - catch (Exception e) { + catch (Throwable e) { chomper.accept(e); } } diff --git a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java index 31dd011d6534..57fa61079d2d 100644 --- a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java +++ b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java @@ -40,10 +40,11 @@ public class CloseableUtilsTest private final TestCloseable quietCloseable2 = new TestCloseable(null); private final TestCloseable ioExceptionCloseable = new TestCloseable(new IOException()); private final TestCloseable runtimeExceptionCloseable = new TestCloseable(new IllegalArgumentException()); + private final TestCloseable assertionErrorCloseable = new TestCloseable(new AssertionError()); // For closeAndSuppressException tests. private final AtomicLong chomped = new AtomicLong(); - private final Consumer chomper = e -> chomped.incrementAndGet(); + private final Consumer chomper = e -> chomped.incrementAndGet(); @Test public void test_closeAll_array_quiet() throws IOException @@ -153,6 +154,21 @@ public void test_closeAndWrapExceptions_runtimeException() Assert.assertThat(e, CoreMatchers.instanceOf(IllegalArgumentException.class)); } + @Test + public void test_closeAndWrapExceptions_assertionError() + { + Throwable e = null; + try { + CloseableUtils.closeAndWrapExceptions(assertionErrorCloseable); + } + catch (Throwable e1) { + e = e1; + } + + assertClosed(assertionErrorCloseable); + Assert.assertThat(e, CoreMatchers.instanceOf(AssertionError.class)); + } + @Test public void test_closeAndSuppressExceptions_null() { @@ -184,6 +200,14 @@ public void test_closeAndSuppressExceptions_runtimeException() Assert.assertEquals(1, chomped.get()); } + @Test + public void test_closeAndSuppressExceptions_assertionError() + { + CloseableUtils.closeAndSuppressExceptions(assertionErrorCloseable, chomper); + assertClosed(assertionErrorCloseable); + Assert.assertEquals(1, chomped.get()); + } + @Test public void test_closeInCatch_improper() { @@ -379,6 +403,32 @@ public void test_closeAndWrapInCatch_runtimeException() Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); } + @Test + public void test_closeAndWrapInCatch_assertionError() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), assertionErrorCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(assertionErrorCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(AssertionError.class)); + } + private static void assertClosed(final TestCloseable... closeables) { for (TestCloseable closeable : closeables) { @@ -389,10 +439,10 @@ private static void assertClosed(final TestCloseable... closeables) private static class TestCloseable implements Closeable { @Nullable - private final Exception e; + private final Throwable e; private final AtomicBoolean closed = new AtomicBoolean(false); - TestCloseable(@Nullable Exception e) + TestCloseable(@Nullable Throwable e) { this.e = e; }