Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.utils.CloseableUtils;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -80,7 +79,7 @@ public boolean hasNext()
return false;
}
if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp);
CloseableUtils.closeAndWrapExceptions(jp);
return false;
}
return true;
Expand Down Expand Up @@ -131,11 +130,6 @@ private void init()
@Override
public void close() throws IOException
{
Closer closer = Closer.create();
if (jp != null) {
closer.register(jp);
}
closer.register(resourceCloser);
closer.close();
CloseableUtils.closeAll(jp, resourceCloser);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.utils.CloseableUtils;
import org.apache.druid.utils.JvmUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -175,6 +176,7 @@ static <T> Sequence<T> makeOutputSequenceForQueue(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
{
private boolean shouldCancelOnCleanup = true;

@Override
public Iterator<T> make()
{
Expand Down Expand Up @@ -463,18 +465,19 @@ private int computeNumTasks()

final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1);

LOG.debug("Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] "
+ "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] "
+ "pool parallelism: [%s] pool size: [%s] steal count: [%s]",
computedNumParallelTasks,
parallelism,
getPool().getActiveThreadCount(),
runningThreadCount,
submissionCount,
getPool().getQueuedTaskCount(),
getPool().getParallelism(),
getPool().getPoolSize(),
getPool().getStealCount()
LOG.debug(
"Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] "
+ "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] "
+ "pool parallelism: [%s] pool size: [%s] steal count: [%s]",
computedNumParallelTasks,
parallelism,
getPool().getActiveThreadCount(),
runningThreadCount,
submissionCount,
getPool().getQueuedTaskCount(),
getPool().getParallelism(),
getPool().getPoolSize(),
getPool().getStealCount()
);

return computedNumParallelTasks;
Expand Down Expand Up @@ -609,7 +612,10 @@ protected void compute()
// which we want to target a 10ms task run time. smooth this value with a cumulative moving average in order
// to prevent normal jitter in processing time from skewing the next yield value too far in any direction
final long elapsedNanos = System.nanoTime() - start;
final double nextYieldAfter = Math.max((double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), 1.0);
final double nextYieldAfter = Math.max(
(double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos),
1.0
);
final long recursionDepth = metricsAccumulator.getTaskCount();
final double cumulativeMovingAverage =
(nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1);
Expand Down Expand Up @@ -1376,6 +1382,6 @@ private static <T> void closeAllCursors(final Collection<BatchedResultsCursor<T>
{
Closer closer = Closer.create();
closer.registerAll(cursors);
CloseQuietly.close(closer);
CloseableUtils.closeAndSuppressExceptions(closer, e -> LOG.warn(e, "Failed to close result cursors"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.druid.java.util.http.client;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory;
import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory;
Expand All @@ -39,17 +39,12 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
*
*/
public class HttpClientInit
{
Expand Down Expand Up @@ -110,11 +105,8 @@ public void stop()

public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath, final String keyStorePassword)
{
FileInputStream in = null;
try {
try (FileInputStream in = new FileInputStream(keyStorePath)) {
final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());

in = new FileInputStream(keyStorePath);
ks.load(in, keyStorePassword.toCharArray());
in.close();

Expand All @@ -125,27 +117,10 @@ public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath

return sslContext;
}
catch (CertificateException e) {
throw new RuntimeException(e);
}
catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
catch (KeyStoreException e) {
throw new RuntimeException(e);
}
catch (KeyManagementException e) {
throw new RuntimeException(e);
}
catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
catch (IOException e) {
catch (Exception e) {
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
finally {
CloseQuietly.close(in);
}
}

private static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer, int bossPoolSize, int workerPoolSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.java.util.http.client.response;

import com.google.common.io.ByteSource;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
Expand Down Expand Up @@ -57,19 +56,17 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler<I
@Override
public ClientResponse<InputStream> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
ChannelBufferInputStream channelStream = null;
try {
channelStream = new ChannelBufferInputStream(response.getContent());
try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(response.getContent())) {
queue.put(channelStream);
}
catch (IOException e) {
throw new RuntimeException(e);
}
catch (InterruptedException e) {
log.error(e, "Queue appending interrupted");
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
finally {
CloseQuietly.close(channelStream);
}
byteCount.addAndGet(response.getContent().readableBytes());
return ClientResponse.finished(
new SequenceInputStream(
Expand Down Expand Up @@ -112,21 +109,19 @@ public ClientResponse<InputStream> handleChunk(
final ChannelBuffer channelBuffer = chunk.getContent();
final int bytes = channelBuffer.readableBytes();
if (bytes > 0) {
ChannelBufferInputStream channelStream = null;
try {
channelStream = new ChannelBufferInputStream(channelBuffer);
try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(channelBuffer)) {
queue.put(channelStream);
// Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong
log.debug("Added stream. Queue length %d", queue.size());
}
catch (IOException e) {
throw new RuntimeException(e);
}
catch (InterruptedException e) {
log.warn(e, "Thread interrupted while adding to queue");
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
finally {
CloseQuietly.close(channelStream);
}
byteCount.addAndGet(bytes);
} else {
log.debug("Skipping zero length chunk");
Expand Down
Loading