diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java index 203a0d2aed67..c3247d69832b 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java @@ -20,6 +20,7 @@ package org.apache.druid.java.util.http.client.response; import com.google.common.io.ByteSource; +import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; @@ -56,14 +57,19 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response, TrafficCop trafficCop) { + ChannelBufferInputStream channelStream = null; try { - queue.put(new ChannelBufferInputStream(response.getContent())); + channelStream = new ChannelBufferInputStream(response.getContent()); + queue.put(channelStream); } catch (InterruptedException e) { log.error(e, "Queue appending interrupted"); Thread.currentThread().interrupt(); throw new RuntimeException(e); } + finally { + CloseQuietly.close(channelStream); + } byteCount.addAndGet(response.getContent().readableBytes()); return ClientResponse.finished( new SequenceInputStream( @@ -106,8 +112,10 @@ public ClientResponse handleChunk( final ChannelBuffer channelBuffer = chunk.getContent(); final int bytes = channelBuffer.readableBytes(); if (bytes > 0) { + ChannelBufferInputStream channelStream = null; try { - queue.put(new ChannelBufferInputStream(channelBuffer)); + channelStream = new ChannelBufferInputStream(channelBuffer); + queue.put(channelStream); // Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong log.debug("Added stream. Queue length %d", queue.size()); } @@ -116,6 +124,9 @@ public ClientResponse handleChunk( Thread.currentThread().interrupt(); throw new RuntimeException(e); } + finally { + CloseQuietly.close(channelStream); + } byteCount.addAndGet(bytes); } else { log.debug("Skipping zero length chunk"); diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java index 6d77d8ef9c9d..aa0def755c1c 100644 --- a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java +++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java @@ -30,8 +30,8 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSortedMap; -import com.google.common.io.CharStreams; import com.google.common.io.Files; +import com.google.common.io.Resources; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; @@ -39,8 +39,7 @@ import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; @@ -211,10 +210,10 @@ private ImmutableSortedMap> readMap(final String m String actualPath = mapPath; try { if (Strings.isNullOrEmpty(mapPath)) { - actualPath = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json").getFile(); + URL defaultWhiteListMapUrl = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json"); + actualPath = defaultWhiteListMapUrl.getFile(); LOGGER.info("using default whiteList map located at [%s]", actualPath); - InputStream byteContent = this.getClass().getClassLoader().getResourceAsStream("defaultWhiteListMap.json"); - fileContent = CharStreams.toString(new InputStreamReader(byteContent, StandardCharsets.UTF_8)); + fileContent = Resources.toString(defaultWhiteListMapUrl, StandardCharsets.UTF_8); } else { fileContent = Files.asCharSource(new File(mapPath), StandardCharsets.UTF_8).read(); } diff --git a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java index 0f7d9a2a65b2..c61443f924a9 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -34,9 +34,10 @@ import org.apache.druid.utils.CompressionUtils; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.nio.file.Files; import java.util.Map; /** @@ -88,9 +89,9 @@ public DataSegment push(final File indexFilesDir, DataSegment segment, final boo int version = SegmentUtils.getVersionFromDir(indexFilesDir); - try { + try (final InputStream fileStream = Files.newInputStream(compressedIndexFile.toPath())) { long start = System.currentTimeMillis(); - ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) + ChunkedStorage.newWriter(indexStorage, key, fileStream) .withConcurrencyLevel(CONCURRENCY).call(); byte[] json = jsonMapper.writeValueAsBytes(segment); MutationBatch mutation = this.keyspace.prepareMutationBatch(); diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java index feb373eaddc5..a5f1b99f5634 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java @@ -30,9 +30,9 @@ import org.apache.druid.tasklogs.TaskLogs; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; public class GoogleTaskLogs implements TaskLogs { @@ -66,27 +66,28 @@ public void pushTaskReports(String taskid, File reportFile) throws IOException private void pushTaskFile(final File logFile, final String taskKey) throws IOException { - FileInputStream fileStream = new FileInputStream(logFile); - - InputStreamContent mediaContent = new InputStreamContent("text/plain", fileStream); - mediaContent.setLength(logFile.length()); - - try { - RetryUtils.retry( - (RetryUtils.Task) () -> { - storage.insert(config.getBucket(), taskKey, mediaContent); - return null; - }, - GoogleUtils::isRetryable, - 1, - 5 - ); - } - catch (IOException e) { - throw e; - } - catch (Exception e) { - throw new RE(e, "Failed to upload [%s] to [%s]", logFile, taskKey); + try (final InputStream fileStream = Files.newInputStream(logFile.toPath())) { + + InputStreamContent mediaContent = new InputStreamContent("text/plain", fileStream); + mediaContent.setLength(logFile.length()); + + try { + RetryUtils.retry( + (RetryUtils.Task) () -> { + storage.insert(config.getBucket(), taskKey, mediaContent); + return null; + }, + GoogleUtils::isRetryable, + 1, + 5 + ); + } + catch (IOException e) { + throw e; + } + catch (Exception e) { + throw new RE(e, "Failed to upload [%s] to [%s]", logFile, taskKey); + } } } diff --git a/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java b/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java index 79d10dfeaaf5..9fe6fec5dd13 100644 --- a/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java +++ b/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java @@ -69,8 +69,8 @@ public void configure(Binder binder) if (stream != null) { log.info("Loading properties from %s", propertiesFile); - try { - fileProps.load(new InputStreamReader(stream, StandardCharsets.UTF_8)); + try (final InputStreamReader in = new InputStreamReader(stream, StandardCharsets.UTF_8)) { + fileProps.load(in); } catch (IOException e) { throw new RuntimeException(e); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 249060a5b8ca..c3f0fba77219 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -40,7 +40,9 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -215,8 +217,10 @@ public List mergeAndGetDictionary() for (File dictFile : dictionaryFiles) { try ( + final InputStream fileStream = Files.newInputStream(dictFile.toPath()); + final LZ4BlockInputStream blockStream = new LZ4BlockInputStream(fileStream); final MappingIterator dictIterator = spillMapper.readValues( - spillMapper.getFactory().createParser(new LZ4BlockInputStream(new FileInputStream(dictFile))), + spillMapper.getFactory().createParser(blockStream), spillMapper.getTypeFactory().constructType(String.class) ) ) { diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index 3974915b043d..fd5c481f6cc6 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -35,7 +35,6 @@ import org.apache.druid.segment.writeout.WriteOutBytes; import javax.annotation.Nullable; -import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -457,10 +456,11 @@ private void writeHeaderLong(FileSmoosher smoosher, int bagSizePower) private void initializeHeaderOutLong() throws IOException { headerOutLong = new LongArrayList(); - DataInput headerOutAsIntInput = new DataInputStream(headerOut.asInputStream()); - for (int i = 0; i < numWritten; i++) { - int count = headerOutAsIntInput.readInt(); - headerOutLong.add(count); + try (final DataInputStream headerOutAsIntInput = new DataInputStream(headerOut.asInputStream())) { + for (int i = 0; i < numWritten; i++) { + int count = headerOutAsIntInput.readInt(); + headerOutLong.add(count); + } } } diff --git a/server/src/main/java/org/apache/druid/server/security/TLSUtils.java b/server/src/main/java/org/apache/druid/server/security/TLSUtils.java index 4ff34af85459..ea85b99b4994 100644 --- a/server/src/main/java/org/apache/druid/server/security/TLSUtils.java +++ b/server/src/main/java/org/apache/druid/server/security/TLSUtils.java @@ -32,8 +32,10 @@ import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509ExtendedKeyManager; import javax.net.ssl.X509ExtendedTrustManager; -import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; @@ -183,10 +185,12 @@ public static SSLContext createSSLContext( KeyStore trustStore = KeyStore.getInstance(trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType); - trustStore.load( - new FileInputStream(trustStorePath), - trustStorePasswordProvider == null ? null : trustStorePasswordProvider.getPassword().toCharArray() - ); + try (final InputStream trustStoreFileStream = Files.newInputStream(Paths.get(trustStorePath))) { + trustStore.load( + trustStoreFileStream, + trustStorePasswordProvider == null ? null : trustStorePasswordProvider.getPassword().toCharArray() + ); + } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : trustStoreAlgorithm); @@ -197,20 +201,24 @@ public static SSLContext createSSLContext( KeyStore keyStore = KeyStore.getInstance(keyStoreType == null ? KeyStore.getDefaultType() : keyStoreType); - keyStore.load( - new FileInputStream(keyStorePath), - keyStorePasswordProvider == null ? null : keyStorePasswordProvider.getPassword().toCharArray() - ); + try (final InputStream keyStoreFileStream = Files.newInputStream(Paths.get(keyStorePath))) { + keyStore.load( + keyStoreFileStream, + keyStorePasswordProvider == null ? null : keyStorePasswordProvider.getPassword().toCharArray() + ); - KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance( - keyStoreAlgorithm == null ? - KeyManagerFactory.getDefaultAlgorithm() : keyStoreAlgorithm - ); - keyManagerFactory.init( - keyStore, - keyManagerFactoryPasswordProvider == null ? null : keyManagerFactoryPasswordProvider.getPassword().toCharArray() - ); - keyManagers = createAliasedKeyManagers(keyManagerFactory.getKeyManagers(), certAlias); + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance( + keyStoreAlgorithm == null ? + KeyManagerFactory.getDefaultAlgorithm() : keyStoreAlgorithm + ); + keyManagerFactory.init( + keyStore, + keyManagerFactoryPasswordProvider == null + ? null + : keyManagerFactoryPasswordProvider.getPassword().toCharArray() + ); + keyManagers = createAliasedKeyManagers(keyManagerFactory.getKeyManagers(), certAlias); + } } else { keyManagers = null; }