From e9e3546dd5ed16c9b74507171ba94ac4b82dd063 Mon Sep 17 00:00:00 2001 From: asdf2014 Date: Mon, 19 Aug 2019 23:17:07 +0800 Subject: [PATCH 1/2] Fix resource leak --- .../SequenceInputStreamResponseHandler.java | 28 +++++++++++- ...istBasedDruidToTimelineEventConverter.java | 4 +- .../cassandra/CassandraDataSegmentPusher.java | 4 +- .../druid/storage/google/GoogleTaskLogs.java | 43 ++++++++++--------- .../apache/druid/guice/PropertiesModule.java | 4 +- .../epinephelinae/SpillingGrouper.java | 4 +- .../segment/data/GenericIndexedWriter.java | 10 ++--- .../druid/server/security/TLSUtils.java | 40 +++++++++-------- 8 files changed, 86 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java index 203a0d2aed67..61e1bec61f39 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java @@ -56,14 +56,26 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response, TrafficCop trafficCop) { + ChannelBufferInputStream channelStream = null; try { - queue.put(new ChannelBufferInputStream(response.getContent())); + channelStream = new ChannelBufferInputStream(response.getContent()); + queue.put(channelStream); } catch (InterruptedException e) { log.error(e, "Queue appending interrupted"); Thread.currentThread().interrupt(); throw new RuntimeException(e); } + finally { + if (channelStream != null) { + try { + channelStream.close(); + } + catch (IOException e) { + log.error(e, "Unable to close the channel buffer"); + } + } + } byteCount.addAndGet(response.getContent().readableBytes()); return ClientResponse.finished( new SequenceInputStream( @@ -106,8 +118,10 @@ public ClientResponse handleChunk( final ChannelBuffer channelBuffer = chunk.getContent(); final int bytes = channelBuffer.readableBytes(); if (bytes > 0) { + ChannelBufferInputStream channelStream = null; try { - queue.put(new ChannelBufferInputStream(channelBuffer)); + channelStream = new ChannelBufferInputStream(channelBuffer); + queue.put(channelStream); // Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong log.debug("Added stream. Queue length %d", queue.size()); } @@ -116,6 +130,16 @@ public ClientResponse handleChunk( Thread.currentThread().interrupt(); throw new RuntimeException(e); } + finally { + if (channelStream != null) { + try { + channelStream.close(); + } + catch (IOException e) { + log.error(e, "Unable to close the channel buffer"); + } + } + } byteCount.addAndGet(bytes); } else { log.debug("Skipping zero length chunk"); diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java index 6d77d8ef9c9d..85f338ac576c 100644 --- a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java +++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java @@ -214,7 +214,9 @@ private ImmutableSortedMap> readMap(final String m actualPath = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json").getFile(); LOGGER.info("using default whiteList map located at [%s]", actualPath); InputStream byteContent = this.getClass().getClassLoader().getResourceAsStream("defaultWhiteListMap.json"); - fileContent = CharStreams.toString(new InputStreamReader(byteContent, StandardCharsets.UTF_8)); + try (final InputStreamReader in = new InputStreamReader(byteContent, StandardCharsets.UTF_8)) { + fileContent = CharStreams.toString(in); + } } else { fileContent = Files.asCharSource(new File(mapPath), StandardCharsets.UTF_8).read(); } diff --git a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java index 0f7d9a2a65b2..21788ff18525 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -88,9 +88,9 @@ public DataSegment push(final File indexFilesDir, DataSegment segment, final boo int version = SegmentUtils.getVersionFromDir(indexFilesDir); - try { + try (final FileInputStream fileStream = new FileInputStream(compressedIndexFile)) { long start = System.currentTimeMillis(); - ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) + ChunkedStorage.newWriter(indexStorage, key, fileStream) .withConcurrencyLevel(CONCURRENCY).call(); byte[] json = jsonMapper.writeValueAsBytes(segment); MutationBatch mutation = this.keyspace.prepareMutationBatch(); diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java index feb373eaddc5..111823d9fd7f 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java @@ -66,27 +66,28 @@ public void pushTaskReports(String taskid, File reportFile) throws IOException private void pushTaskFile(final File logFile, final String taskKey) throws IOException { - FileInputStream fileStream = new FileInputStream(logFile); - - InputStreamContent mediaContent = new InputStreamContent("text/plain", fileStream); - mediaContent.setLength(logFile.length()); - - try { - RetryUtils.retry( - (RetryUtils.Task) () -> { - storage.insert(config.getBucket(), taskKey, mediaContent); - return null; - }, - GoogleUtils::isRetryable, - 1, - 5 - ); - } - catch (IOException e) { - throw e; - } - catch (Exception e) { - throw new RE(e, "Failed to upload [%s] to [%s]", logFile, taskKey); + try (final FileInputStream fileStream = new FileInputStream(logFile)) { + + InputStreamContent mediaContent = new InputStreamContent("text/plain", fileStream); + mediaContent.setLength(logFile.length()); + + try { + RetryUtils.retry( + (RetryUtils.Task) () -> { + storage.insert(config.getBucket(), taskKey, mediaContent); + return null; + }, + GoogleUtils::isRetryable, + 1, + 5 + ); + } + catch (IOException e) { + throw e; + } + catch (Exception e) { + throw new RE(e, "Failed to upload [%s] to [%s]", logFile, taskKey); + } } } diff --git a/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java b/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java index 79d10dfeaaf5..9fe6fec5dd13 100644 --- a/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java +++ b/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java @@ -69,8 +69,8 @@ public void configure(Binder binder) if (stream != null) { log.info("Loading properties from %s", propertiesFile); - try { - fileProps.load(new InputStreamReader(stream, StandardCharsets.UTF_8)); + try (final InputStreamReader in = new InputStreamReader(stream, StandardCharsets.UTF_8)) { + fileProps.load(in); } catch (IOException e) { throw new RuntimeException(e); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 249060a5b8ca..0ec41fedcd59 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -215,8 +215,10 @@ public List mergeAndGetDictionary() for (File dictFile : dictionaryFiles) { try ( + final FileInputStream fileStream = new FileInputStream(dictFile); + final LZ4BlockInputStream blockStream = new LZ4BlockInputStream(fileStream); final MappingIterator dictIterator = spillMapper.readValues( - spillMapper.getFactory().createParser(new LZ4BlockInputStream(new FileInputStream(dictFile))), + spillMapper.getFactory().createParser(blockStream), spillMapper.getTypeFactory().constructType(String.class) ) ) { diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index 3974915b043d..fd5c481f6cc6 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -35,7 +35,6 @@ import org.apache.druid.segment.writeout.WriteOutBytes; import javax.annotation.Nullable; -import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -457,10 +456,11 @@ private void writeHeaderLong(FileSmoosher smoosher, int bagSizePower) private void initializeHeaderOutLong() throws IOException { headerOutLong = new LongArrayList(); - DataInput headerOutAsIntInput = new DataInputStream(headerOut.asInputStream()); - for (int i = 0; i < numWritten; i++) { - int count = headerOutAsIntInput.readInt(); - headerOutLong.add(count); + try (final DataInputStream headerOutAsIntInput = new DataInputStream(headerOut.asInputStream())) { + for (int i = 0; i < numWritten; i++) { + int count = headerOutAsIntInput.readInt(); + headerOutLong.add(count); + } } } diff --git a/server/src/main/java/org/apache/druid/server/security/TLSUtils.java b/server/src/main/java/org/apache/druid/server/security/TLSUtils.java index 4ff34af85459..a190c108ad7a 100644 --- a/server/src/main/java/org/apache/druid/server/security/TLSUtils.java +++ b/server/src/main/java/org/apache/druid/server/security/TLSUtils.java @@ -183,10 +183,12 @@ public static SSLContext createSSLContext( KeyStore trustStore = KeyStore.getInstance(trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType); - trustStore.load( - new FileInputStream(trustStorePath), - trustStorePasswordProvider == null ? null : trustStorePasswordProvider.getPassword().toCharArray() - ); + try (final FileInputStream trustStoreFileStream = new FileInputStream(trustStorePath)) { + trustStore.load( + trustStoreFileStream, + trustStorePasswordProvider == null ? null : trustStorePasswordProvider.getPassword().toCharArray() + ); + } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : trustStoreAlgorithm); @@ -197,20 +199,24 @@ public static SSLContext createSSLContext( KeyStore keyStore = KeyStore.getInstance(keyStoreType == null ? KeyStore.getDefaultType() : keyStoreType); - keyStore.load( - new FileInputStream(keyStorePath), - keyStorePasswordProvider == null ? null : keyStorePasswordProvider.getPassword().toCharArray() - ); + try (final FileInputStream keyStoreFileStream = new FileInputStream(keyStorePath)) { + keyStore.load( + keyStoreFileStream, + keyStorePasswordProvider == null ? null : keyStorePasswordProvider.getPassword().toCharArray() + ); - KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance( - keyStoreAlgorithm == null ? - KeyManagerFactory.getDefaultAlgorithm() : keyStoreAlgorithm - ); - keyManagerFactory.init( - keyStore, - keyManagerFactoryPasswordProvider == null ? null : keyManagerFactoryPasswordProvider.getPassword().toCharArray() - ); - keyManagers = createAliasedKeyManagers(keyManagerFactory.getKeyManagers(), certAlias); + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance( + keyStoreAlgorithm == null ? + KeyManagerFactory.getDefaultAlgorithm() : keyStoreAlgorithm + ); + keyManagerFactory.init( + keyStore, + keyManagerFactoryPasswordProvider == null + ? null + : keyManagerFactoryPasswordProvider.getPassword().toCharArray() + ); + keyManagers = createAliasedKeyManagers(keyManagerFactory.getKeyManagers(), certAlias); + } } else { keyManagers = null; } From 648f11006b81b1811bdf23af81502f5e217772ca Mon Sep 17 00:00:00 2001 From: asdf2014 Date: Tue, 20 Aug 2019 10:19:05 +0800 Subject: [PATCH 2/2] Patch comments --- .../SequenceInputStreamResponseHandler.java | 19 +++---------------- ...istBasedDruidToTimelineEventConverter.java | 13 +++++-------- .../cassandra/CassandraDataSegmentPusher.java | 5 +++-- .../druid/storage/google/GoogleTaskLogs.java | 4 ++-- .../epinephelinae/SpillingGrouper.java | 4 +++- .../druid/server/security/TLSUtils.java | 8 +++++--- 6 files changed, 21 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java index 61e1bec61f39..c3247d69832b 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java @@ -20,6 +20,7 @@ package org.apache.druid.java.util.http.client.response; import com.google.common.io.ByteSource; +import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; @@ -67,14 +68,7 @@ public ClientResponse handleResponse(HttpResponse response, Traffic throw new RuntimeException(e); } finally { - if (channelStream != null) { - try { - channelStream.close(); - } - catch (IOException e) { - log.error(e, "Unable to close the channel buffer"); - } - } + CloseQuietly.close(channelStream); } byteCount.addAndGet(response.getContent().readableBytes()); return ClientResponse.finished( @@ -131,14 +125,7 @@ public ClientResponse handleChunk( throw new RuntimeException(e); } finally { - if (channelStream != null) { - try { - channelStream.close(); - } - catch (IOException e) { - log.error(e, "Unable to close the channel buffer"); - } - } + CloseQuietly.close(channelStream); } byteCount.addAndGet(bytes); } else { diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java index 85f338ac576c..aa0def755c1c 100644 --- a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java +++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/WhiteListBasedDruidToTimelineEventConverter.java @@ -30,8 +30,8 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSortedMap; -import com.google.common.io.CharStreams; import com.google.common.io.Files; +import com.google.common.io.Resources; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; @@ -39,8 +39,7 @@ import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; @@ -211,12 +210,10 @@ private ImmutableSortedMap> readMap(final String m String actualPath = mapPath; try { if (Strings.isNullOrEmpty(mapPath)) { - actualPath = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json").getFile(); + URL defaultWhiteListMapUrl = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json"); + actualPath = defaultWhiteListMapUrl.getFile(); LOGGER.info("using default whiteList map located at [%s]", actualPath); - InputStream byteContent = this.getClass().getClassLoader().getResourceAsStream("defaultWhiteListMap.json"); - try (final InputStreamReader in = new InputStreamReader(byteContent, StandardCharsets.UTF_8)) { - fileContent = CharStreams.toString(in); - } + fileContent = Resources.toString(defaultWhiteListMapUrl, StandardCharsets.UTF_8); } else { fileContent = Files.asCharSource(new File(mapPath), StandardCharsets.UTF_8).read(); } diff --git a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java index 21788ff18525..c61443f924a9 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -34,9 +34,10 @@ import org.apache.druid.utils.CompressionUtils; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.nio.file.Files; import java.util.Map; /** @@ -88,7 +89,7 @@ public DataSegment push(final File indexFilesDir, DataSegment segment, final boo int version = SegmentUtils.getVersionFromDir(indexFilesDir); - try (final FileInputStream fileStream = new FileInputStream(compressedIndexFile)) { + try (final InputStream fileStream = Files.newInputStream(compressedIndexFile.toPath())) { long start = System.currentTimeMillis(); ChunkedStorage.newWriter(indexStorage, key, fileStream) .withConcurrencyLevel(CONCURRENCY).call(); diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java index 111823d9fd7f..a5f1b99f5634 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java @@ -30,9 +30,9 @@ import org.apache.druid.tasklogs.TaskLogs; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; public class GoogleTaskLogs implements TaskLogs { @@ -66,7 +66,7 @@ public void pushTaskReports(String taskid, File reportFile) throws IOException private void pushTaskFile(final File logFile, final String taskKey) throws IOException { - try (final FileInputStream fileStream = new FileInputStream(logFile)) { + try (final InputStream fileStream = Files.newInputStream(logFile.toPath())) { InputStreamContent mediaContent = new InputStreamContent("text/plain", fileStream); mediaContent.setLength(logFile.length()); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 0ec41fedcd59..c3f0fba77219 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -40,7 +40,9 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -215,7 +217,7 @@ public List mergeAndGetDictionary() for (File dictFile : dictionaryFiles) { try ( - final FileInputStream fileStream = new FileInputStream(dictFile); + final InputStream fileStream = Files.newInputStream(dictFile.toPath()); final LZ4BlockInputStream blockStream = new LZ4BlockInputStream(fileStream); final MappingIterator dictIterator = spillMapper.readValues( spillMapper.getFactory().createParser(blockStream), diff --git a/server/src/main/java/org/apache/druid/server/security/TLSUtils.java b/server/src/main/java/org/apache/druid/server/security/TLSUtils.java index a190c108ad7a..ea85b99b4994 100644 --- a/server/src/main/java/org/apache/druid/server/security/TLSUtils.java +++ b/server/src/main/java/org/apache/druid/server/security/TLSUtils.java @@ -32,8 +32,10 @@ import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509ExtendedKeyManager; import javax.net.ssl.X509ExtendedTrustManager; -import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; @@ -183,7 +185,7 @@ public static SSLContext createSSLContext( KeyStore trustStore = KeyStore.getInstance(trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType); - try (final FileInputStream trustStoreFileStream = new FileInputStream(trustStorePath)) { + try (final InputStream trustStoreFileStream = Files.newInputStream(Paths.get(trustStorePath))) { trustStore.load( trustStoreFileStream, trustStorePasswordProvider == null ? null : trustStorePasswordProvider.getPassword().toCharArray() @@ -199,7 +201,7 @@ public static SSLContext createSSLContext( KeyStore keyStore = KeyStore.getInstance(keyStoreType == null ? KeyStore.getDefaultType() : keyStoreType); - try (final FileInputStream keyStoreFileStream = new FileInputStream(keyStorePath)) { + try (final InputStream keyStoreFileStream = Files.newInputStream(Paths.get(keyStorePath))) { keyStore.load( keyStoreFileStream, keyStorePasswordProvider == null ? null : keyStorePasswordProvider.getPassword().toCharArray()