From de3486bf3f76daea1fb6528bd5148994edc9dbfa Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 29 Jan 2019 12:55:02 -0800 Subject: [PATCH 1/9] Add guava compatability up to 27.0.1 --- .travis.yml | 6 +- codestyle/druid-forbidden-apis.txt | 7 +- .../apache/druid/common/guava/GuavaUtils.java | 104 +++++++++++ .../org/apache/druid/indexer/TaskStatus.java | 23 +-- .../common/concurrent/ListenableFutures.java | 44 +++-- .../druid/common/guava/GuavaUtilsTest.java | 29 +++- .../firehose/kafka/KafkaSimpleConsumer.java | 83 ++++----- .../druid/indexing/common/TaskLock.java | 18 +- .../indexing/common/task/MergeTaskBase.java | 161 +++++++++--------- .../SinglePhaseParallelIndexTaskRunner.java | 4 +- .../indexing/overlord/ForkingTaskRunner.java | 3 +- .../indexing/overlord/RemoteTaskRunner.java | 6 +- .../druid/indexing/overlord/TaskLockbox.java | 14 +- .../druid/indexing/overlord/TaskQueue.java | 4 +- .../overlord/hrtr/HttpRemoteTaskRunner.java | 3 +- .../SeekableStreamIndexTaskRunner.java | 13 +- .../supervisor/SeekableStreamSupervisor.java | 19 ++- .../indexing/worker/WorkerTaskManager.java | 3 +- .../worker/executor/ExecutorLifecycle.java | 3 +- .../worker/http/TaskManagementResource.java | 4 +- .../indexing/overlord/TestTaskRunner.java | 3 +- pom.xml | 13 ++ .../druid/client/DirectDruidClient.java | 4 +- .../druid/client/HttpServerInventoryView.java | 16 +- .../druid/client/indexing/QueryStatus.java | 29 ++-- .../druid/client/indexing/TaskStatus.java | 27 +-- .../appenderator/AppenderatorImpl.java | 3 +- .../appenderator/AppenderatorPlumber.java | 6 +- .../appenderator/BaseAppenderatorDriver.java | 3 +- .../StreamAppenderatorDriver.java | 6 +- .../org/apache/druid/server/DruidNode.java | 3 +- .../server/http/HostAndPortWithScheme.java | 3 +- .../server/http/SegmentListerResource.java | 7 +- .../cache/LookupCoordinatorManager.java | 3 +- .../client/CachingClusteredClientTest.java | 3 +- .../StreamAppenderatorDriverFailTest.java | 5 +- .../ChangeRequestHistoryTest.java | 4 +- .../sql/calcite/schema/SystemSchema.java | 3 +- 38 files changed, 442 insertions(+), 250 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1e17711f7060..5d91d34069c5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -37,12 +37,12 @@ matrix: install: true script: MAVEN_OPTS='-Xmx3000m' mvn clean verify -Prat -DskipTests -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn - # strict compilation + # strict compilation and future guava compat - env: - - NAME="strict compilation" + - NAME="strict compilation and guava checks" install: true # Strict compilation requires more than 2 GB - script: MAVEN_OPTS='-Xmx3000m' mvn clean -Pstrict -pl '!benchmarks' compile test-compile -B --fail-at-end + script: MAVEN_OPTS='-Xmx3000m' mvn clean -Pstrict -Precent-guava -pl '!benchmarks' compile test-compile -B --fail-at-end # processing module test - env: diff --git a/codestyle/druid-forbidden-apis.txt b/codestyle/druid-forbidden-apis.txt index 5eb5256a2680..9d47a05bde95 100644 --- a/codestyle/druid-forbidden-apis.txt +++ b/codestyle/druid-forbidden-apis.txt @@ -34,7 +34,12 @@ java.util.Random#() @ Use ThreadLocalRandom.current() or the constructor w java.lang.Math#random() @ Use ThreadLocalRandom.current() java.util.regex.Pattern#matches(java.lang.String,java.lang.CharSequence) @ Use String.startsWith(), endsWith(), contains(), or compile and cache a Pattern explicitly org.apache.commons.io.FileUtils#getTempDirectory() @ Use org.junit.rules.TemporaryFolder for tests instead - +com.google.common.net.HostAndPort#getHostText() @ Use org.apache.druid.common.guava.GuavaUtils#getHostText instead +com.google.common.base.CharMatcher.BREAKING_WHITESPACE @ Use org.apache.druid.common.guava.GuavaUtils#breakingWhitespace() +com.google.common.base.CharMatcher#breakingWhitespace() @ Use org.apache.druid.common.guava.GuavaUtils#breakingWhitespace() +com.google.common.base.Objects#toStringHelper(java.lang.Object) @ Deprecated in future guava +com.google.common.base.Objects#toStringHelper(java.lang.Class) @ Deprecated in future guava +com.google.common.base.Objects#toStringHelper(java.lang.String) @ Deprecated in future guava @defaultMessage For performance reasons, use the utf8Base64 / encodeBase64 / encodeBase64String / decodeBase64 / decodeBase64String methods in StringUtils org.apache.commons.codec.binary.Base64 com.google.common.io.BaseEncoding.base64 \ No newline at end of file diff --git a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java index 47d996033290..9af134e00d0c 100644 --- a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java +++ b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java @@ -19,16 +19,54 @@ package org.apache.druid.common.guava; +import com.google.common.base.CharMatcher; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.net.HostAndPort; import com.google.common.primitives.Longs; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; +import javax.el.MethodNotFoundException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; /** + * */ public class GuavaUtils { + private static final CharMatcher BREAKING_WHITESPACE_INSTANCE; + + static { + CharMatcher matcher; + try { + final Method m = CharMatcher.class.getDeclaredMethod("breakingWhitespace"); + matcher = (CharMatcher) m.invoke(null); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException("Failed to fetch breakingWhitespace", e); + } + catch (NoSuchMethodException e) { + try { + final Field f = CharMatcher.class.getDeclaredField("BREAKING_WHITESPACE"); + matcher = (CharMatcher) f.get(null); + } + catch (IllegalAccessException e1) { + throw new IllegalStateException("Failed to access BREAKING_WHITESPACE", e1); + } + catch (NoSuchFieldException e1) { + throw new IllegalStateException("Cannot find breaking white space in guava", e1); + } + } + if (matcher == null) { + throw new IllegalStateException("wtf!?"); + } + BREAKING_WHITESPACE_INSTANCE = matcher; + } /** * To fix semantic difference of Longs.tryParse() from Long.parseLong (Longs.tryParse() returns null for '+' started value) @@ -62,4 +100,70 @@ public static > T getEnumIfPresent(final Class enumClass, f return null; } + + /** + * Try the various methods (zero arguments) against the object. This is handy for maintaining guava compatability + * + * @param object The object to call the methods on + * @param methods The sequence of methods to call + * @param The return type + * + * @return The result of invoking the method on the object + */ + private static T tryMethods(Object object, Class assignableTo, String... methods) + { + for (String method : methods) { + try { + final Method m = object.getClass().getDeclaredMethod(method); + if (!assignableTo.isAssignableFrom(m.getReturnType())) { + throw new IAE( + "Cannot assign [%s] to [%s] in [%s] from [%s]", + m.getReturnType(), + assignableTo, + m, + object.getClass() + ); + } + try { + return (T) m.invoke(object); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new IAE("Failed to invoke [%s] on [%s]", m, object); + } + } + catch (NoSuchMethodException e) { + // Keep going + } + } + throw new MethodNotFoundException(StringUtils.format( + "Unable to find methods %s in [%s]", + Arrays.toString(methods), + object.getClass() + )); + } + + /** + * Get the host portion of the {@link HostAndPort} that has the host's text. Changes in different guava versions + * + * @param hostAndPort The object to pull from + * + * @return The host portion of the host and port + */ + public static String getHostText(HostAndPort hostAndPort) + { + return tryMethods(hostAndPort, String.class, "getHostText", "getHost"); + } + + /** + * Returns the instance of the breaking whitespace char matcher in guava. + * + * This moved starting in guava 19 + * + * @return `CharMatcher.BREAKING_WHITESPACE` or `CharMatcher.breakingWhitespace()` depending on whichever works + */ + + public static CharMatcher breakingWhitespace() + { + return BREAKING_WHITESPACE_INSTANCE; + } } diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java index 6b90fef06171..a8186f51c1ab 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java @@ -22,9 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import java.util.Objects; + /** * Represents the status of a task from the perspective of the coordinator. The task may be ongoing * ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true). @@ -178,12 +179,12 @@ public TaskStatus withDuration(long _duration) @Override public String toString() { - return Objects.toStringHelper(this) - .add("id", id) - .add("status", status) - .add("duration", duration) - .add("errorMsg", errorMsg) - .toString(); + return "TaskStatus{" + + "id='" + id + '\'' + + ", status=" + status + + ", duration=" + duration + + ", errorMsg='" + errorMsg + '\'' + + '}'; } @Override @@ -196,15 +197,15 @@ public boolean equals(Object o) return false; } TaskStatus that = (TaskStatus) o; - return getDuration() == that.getDuration() && - java.util.Objects.equals(getId(), that.getId()) && + return duration == that.duration && + id.equals(that.id) && status == that.status && - java.util.Objects.equals(getErrorMsg(), that.getErrorMsg()); + Objects.equals(errorMsg, that.errorMsg); } @Override public int hashCode() { - return java.util.Objects.hash(getId(), status, getDuration(), getErrorMsg()); + return Objects.hash(id, status, duration, errorMsg); } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java index 1722e4a9c663..a7e1f5067ef5 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java +++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java @@ -42,18 +42,32 @@ public static ListenableFuture transformAsync( ) { final SettableFuture finalFuture = SettableFuture.create(); - Futures.addCallback(inFuture, new FutureCallback() - { - @Override - public void onSuccess(@Nullable I result) - { - final ListenableFuture transformFuture = transform.apply(result); - Futures.addCallback(transformFuture, new FutureCallback() + Futures.addCallback( + inFuture, + new FutureCallback() { @Override - public void onSuccess(@Nullable O result) + public void onSuccess(@Nullable I result) { - finalFuture.set(result); + final ListenableFuture transformFuture = transform.apply(result); + Futures.addCallback( + transformFuture, + new FutureCallback() + { + @Override + public void onSuccess(@Nullable O result) + { + finalFuture.set(result); + } + + @Override + public void onFailure(Throwable t) + { + finalFuture.setException(t); + } + }, + Execs.directExecutor() + ); } @Override @@ -61,15 +75,9 @@ public void onFailure(Throwable t) { finalFuture.setException(t); } - }); - } - - @Override - public void onFailure(Throwable t) - { - finalFuture.setException(t); - } - }); + }, + Execs.directExecutor() + ); return finalFuture; } } diff --git a/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java b/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java index 6bd764f0a5de..c043e7b46622 100644 --- a/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java +++ b/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java @@ -19,19 +19,13 @@ package org.apache.druid.common.guava; +import com.google.common.net.HostAndPort; import com.google.common.primitives.Longs; import org.junit.Assert; import org.junit.Test; public class GuavaUtilsTest { - enum MyEnum - { - ONE, - TWO, - BUCKLE_MY_SHOE - } - @Test public void testParseLong() { @@ -53,4 +47,25 @@ public void testGetEnumIfPresent() Assert.assertEquals(MyEnum.BUCKLE_MY_SHOE, GuavaUtils.getEnumIfPresent(MyEnum.class, "BUCKLE_MY_SHOE")); Assert.assertEquals(null, GuavaUtils.getEnumIfPresent(MyEnum.class, "buckle_my_shoe")); } + + @Test + public void testHostAndPorthostText() + { + Assert.assertEquals("localhost", GuavaUtils.getHostText(HostAndPort.fromString("localhost"))); + Assert.assertEquals("127.0.0.1", GuavaUtils.getHostText(HostAndPort.fromString("127.0.0.1"))); + Assert.assertEquals("::1", GuavaUtils.getHostText(HostAndPort.fromString("::1"))); + } + + @Test + public void testBreakingWhitespaceExists() + { + Assert.assertNotNull(GuavaUtils.breakingWhitespace()); + } + + enum MyEnum + { + ONE, + TWO, + BUCKLE_MY_SHOE + } } diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java index 038fb2db90f4..083dbbd7e172 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java @@ -38,6 +38,7 @@ import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.common.logger.Logger; @@ -62,31 +63,29 @@ public class KafkaSimpleConsumer public static final List EMPTY_MSGS = new ArrayList<>(); private static final Logger log = new Logger(KafkaSimpleConsumer.class); - + private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30); + private static final int BUFFER_SIZE = 65536; + private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1); + private static final int FETCH_SIZE = 100_000_000; private final List allBrokers; private final String topic; private final int partitionId; private final String clientId; private final String leaderLookupClientId; private final boolean earliest; - private volatile Broker leaderBroker; private List replicaBrokers; private SimpleConsumer consumer = null; - private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30); - private static final int BUFFER_SIZE = 65536; - private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1); - private static final int FETCH_SIZE = 100_000_000; - public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List brokers, boolean earliest) { List brokerList = new ArrayList<>(); for (String broker : brokers) { HostAndPort brokerHostAndPort = HostAndPort.fromString(broker); + final String hostText = GuavaUtils.getHostText(brokerHostAndPort); Preconditions.checkArgument( - brokerHostAndPort.getHostText() != null && - !brokerHostAndPort.getHostText().isEmpty() && + hostText != null && + !hostText.isEmpty() && brokerHostAndPort.hasPort(), "kafka broker [%s] is not valid, must be :", broker @@ -126,35 +125,6 @@ private void ensureConsumer(Broker leader) throws InterruptedException } } - public static class BytesMessageWithOffset - { - final byte[] msg; - final long offset; - final int partition; - - public BytesMessageWithOffset(byte[] msg, long offset, int partition) - { - this.msg = msg; - this.offset = offset; - this.partition = partition; - } - - public int getPartition() - { - return partition; - } - - public byte[] message() - { - return msg; - } - - public long offset() - { - return offset; - } - } - private Iterable filterAndDecode(Iterable kafkaMessages, final long offset) { return FunctionalIterable @@ -304,7 +274,13 @@ private PartitionMetadata findLeader() throws InterruptedException SimpleConsumer consumer = null; try { log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString()); - consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, leaderLookupClientId); + consumer = new SimpleConsumer( + GuavaUtils.getHostText(broker), + broker.getPort(), + SO_TIMEOUT_MILLIS, + BUFFER_SIZE, + leaderLookupClientId + ); TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic))); List metaData = resp.topicsMetadata(); @@ -388,4 +364,33 @@ private void ensureNotInterrupted(Exception e) throws InterruptedException throw new InterruptedException(); } } + + public static class BytesMessageWithOffset + { + final byte[] msg; + final long offset; + final int partition; + + public BytesMessageWithOffset(byte[] msg, long offset, int partition) + { + this.msg = msg; + this.offset = offset; + this.partition = partition; + } + + public int getPartition() + { + return partition; + } + + public byte[] message() + { + return msg; + } + + public long offset() + { + return offset; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java index 865b053b95c4..dbc67334c9c5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java @@ -172,14 +172,14 @@ public int hashCode() @Override public String toString() { - return Objects.toStringHelper(this) - .add("type", type) - .add("groupId", groupId) - .add("dataSource", dataSource) - .add("interval", interval) - .add("version", version) - .add("priority", priority) - .add("revoked", revoked) - .toString(); + return "TaskLock{" + + "type=" + type + + ", groupId='" + groupId + '\'' + + ", dataSource='" + dataSource + '\'' + + ", interval=" + interval + + ", version='" + version + '\'' + + ", priority=" + priority + + ", revoked=" + revoked + + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java index 2718be043863..ccef1f0de4d3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; @@ -61,6 +60,7 @@ import java.util.stream.Collectors; /** + * */ public abstract class MergeTaskBase extends AbstractFixedIntervalTask { @@ -114,6 +114,80 @@ public boolean apply(@Nullable DataSegment segment) this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; } + private static String computeProcessingID(final String dataSource, final List segments) + { + final String segmentIDs = Joiner.on("_").join( + Iterables.transform( + Ordering.natural().sortedCopy(segments), new Function() + { + @Override + public String apply(DataSegment x) + { + return StringUtils.format( + "%s_%s_%s_%s", + x.getInterval().getStart(), + x.getInterval().getEnd(), + x.getVersion(), + x.getShardSpec().getPartitionNum() + ); + } + } + ) + ); + + return StringUtils.format( + "%s_%s", + dataSource, + Hashing.sha1().hashString(segmentIDs, StandardCharsets.UTF_8).toString() + ); + } + + private static Interval computeMergedInterval(final List segments) + { + Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0"); + + DateTime start = null; + DateTime end = null; + + for (final DataSegment segment : segments) { + if (start == null || segment.getInterval().getStart().isBefore(start)) { + start = segment.getInterval().getStart(); + } + + if (end == null || segment.getInterval().getEnd().isAfter(end)) { + end = segment.getInterval().getEnd(); + } + } + + return new Interval(start, end); + } + + private static DataSegment computeMergedSegment( + final String dataSource, + final String version, + final List segments + ) + { + final Interval mergedInterval = computeMergedInterval(segments); + final Set mergedDimensions = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + final Set mergedMetrics = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + + for (DataSegment segment : segments) { + mergedDimensions.addAll(segment.getDimensions()); + mergedMetrics.addAll(segment.getMetrics()); + } + + return DataSegment.builder() + .dataSource(dataSource) + .interval(mergedInterval) + .version(version) + .binaryVersion(IndexIO.CURRENT_VERSION_ID) + .shardSpec(NoneShardSpec.instance()) + .dimensions(Lists.newArrayList(mergedDimensions)) + .metrics(Lists.newArrayList(mergedMetrics)) + .build(); + } + protected void verifyInputSegments(List segments) { // Verify segments are all unsharded @@ -251,86 +325,9 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() @Override public String toString() { - return Objects.toStringHelper(this) - .add("id", getId()) - .add("dataSource", getDataSource()) - .add("interval", getInterval()) - .add("segments", segments) - .add("segmentWriteOutMediumFactory", segmentWriteOutMediumFactory) - .toString(); - } - - private static String computeProcessingID(final String dataSource, final List segments) - { - final String segmentIDs = Joiner.on("_").join( - Iterables.transform( - Ordering.natural().sortedCopy(segments), new Function() - { - @Override - public String apply(DataSegment x) - { - return StringUtils.format( - "%s_%s_%s_%s", - x.getInterval().getStart(), - x.getInterval().getEnd(), - x.getVersion(), - x.getShardSpec().getPartitionNum() - ); - } - } - ) - ); - - return StringUtils.format( - "%s_%s", - dataSource, - Hashing.sha1().hashString(segmentIDs, StandardCharsets.UTF_8).toString() - ); - } - - private static Interval computeMergedInterval(final List segments) - { - Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0"); - - DateTime start = null; - DateTime end = null; - - for (final DataSegment segment : segments) { - if (start == null || segment.getInterval().getStart().isBefore(start)) { - start = segment.getInterval().getStart(); - } - - if (end == null || segment.getInterval().getEnd().isAfter(end)) { - end = segment.getInterval().getEnd(); - } - } - - return new Interval(start, end); - } - - private static DataSegment computeMergedSegment( - final String dataSource, - final String version, - final List segments - ) - { - final Interval mergedInterval = computeMergedInterval(segments); - final Set mergedDimensions = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); - final Set mergedMetrics = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); - - for (DataSegment segment : segments) { - mergedDimensions.addAll(segment.getDimensions()); - mergedMetrics.addAll(segment.getMetrics()); - } - - return DataSegment.builder() - .dataSource(dataSource) - .interval(mergedInterval) - .version(version) - .binaryVersion(IndexIO.CURRENT_VERSION_ID) - .shardSpec(NoneShardSpec.instance()) - .dimensions(Lists.newArrayList(mergedDimensions)) - .metrics(Lists.newArrayList(mergedMetrics)) - .build(); + return "MergeTaskBase{" + + "segments=" + segments + + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index d04315668e66..df557b5a23b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -36,6 +36,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.MonitorEntry; import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; @@ -437,7 +438,8 @@ public void onFailure(Throwable t) log.error(t, "Error while running a task for subTaskSpec[%s]", spec); taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t)); } - } + }, + Execs.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index c3a0bc94ded3..25104927712c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -42,6 +42,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import org.apache.commons.io.FileUtils; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; @@ -840,7 +841,7 @@ public boolean matches(char c) if (inQuotes) { return false; } - return CharMatcher.BREAKING_WHITESPACE.matches(c); + return GuavaUtils.breakingWhitespace().matches(c); } } ).omitEmptyStrings().split(string).iterator(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index b83ea673c9f3..e39fc58b8e6c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -264,7 +264,8 @@ public void onFailure(Throwable throwable) waitingForMonitor.notifyAll(); } } - } + }, + Execs.directExecutor() ); break; case CHILD_UPDATED: @@ -1175,7 +1176,8 @@ public void onFailure(Throwable t) { removedWorkerCleanups.remove(worker, cleanupTask); } - } + }, + Execs.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index b8402fab3bed..b8d172313881 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -196,7 +196,7 @@ public int compare(Pair left, Pair right) * @param interval interval to lock * * @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a - * {@link LockResult#revoked} flag. + * {@link LockResult#isRevoked} flag. * * @throws InterruptedException if the current thread is interrupted */ @@ -231,7 +231,7 @@ public LockResult lock( * @param timeoutMs maximum time to wait * * @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a - * {@link LockResult#revoked} flag. + * {@link LockResult#isRevoked} flag. * * @throws InterruptedException if the current thread is interrupted */ @@ -268,7 +268,7 @@ public LockResult lock( * @param interval interval to lock * * @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a - * {@link LockResult#revoked} flag. + * {@link LockResult#isRevoked} flag. * * @throws IllegalStateException if the task is not a valid active task */ @@ -1094,10 +1094,10 @@ public int hashCode() @Override public String toString() { - return Objects.toStringHelper(this) - .add("taskLock", taskLock) - .add("taskIds", taskIds) - .toString(); + return "TaskLockPosse{" + + "taskLock=" + taskLock + + ", taskIds=" + taskIds + + '}'; } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index ef30f15f2259..9351e18b99c9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -38,6 +38,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -535,7 +536,8 @@ private void handleStatus(final TaskStatus status) .emit(); } } - } + }, + Execs.directExecutor() ); return statusFuture; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 5cb99ba569a1..71a0f3bca86b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -623,7 +623,8 @@ public void onFailure(Throwable t) { removedWorkerCleanups.remove(workerHostAndPort, cleanupTask); } - } + }, + Execs.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 8e502f0387eb..18de9f44e9d9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -66,6 +66,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.collect.Utils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.indexing.RealtimeIOConfig; @@ -620,7 +621,8 @@ public void onFailure(Throwable t) log.error("Persist failed, dying"); backgroundThreadException = t; } - } + }, + Execs.directExecutor() ); } } @@ -875,7 +877,8 @@ private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) } else { return publishedSegmentsAndMetadata; } - } + }, + Execs.directExecutor() ); publishWaitList.add(publishFuture); @@ -924,7 +927,8 @@ public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata) handoffFuture.set(handoffSegmentsAndMetadata); return null; } - } + }, + Execs.directExecutor() ); } @@ -934,7 +938,8 @@ public void onFailure(Throwable t) log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata); handoffFuture.setException(t); } - } + }, + Execs.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 25250ac0487c..17906e42dba0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -900,11 +900,12 @@ private Map> getCurrentTotalStats() futures.add( Futures.transform( taskClient.getMovingAveragesAsync(taskId), - (Function, StatsFromTaskResult>) (currentStats) -> new StatsFromTaskResult( + (currentStats) -> new StatsFromTaskResult( groupId, taskId, currentStats - ) + ), + Execs.directExecutor() ) ); groupAndTaskIds.add(new Pair<>(groupId, taskId)); @@ -918,11 +919,12 @@ private Map> getCurrentTotalStats() futures.add( Futures.transform( taskClient.getMovingAveragesAsync(taskId), - (Function, StatsFromTaskResult>) (currentStats) -> new StatsFromTaskResult( + (currentStats) -> new StatsFromTaskResult( groupId, taskId, currentStats - ) + ), + Execs.directExecutor() ) ); groupAndTaskIds.add(new Pair<>(groupId, taskId)); @@ -1625,7 +1627,8 @@ public Void apply(@Nullable Boolean result) } return null; } - } + }, + Execs.directExecutor() ); } @@ -1908,7 +1911,8 @@ public Map apply(@Nullable Object input) { return null; } - } + }, + Execs.directExecutor() ); } @@ -2536,7 +2540,8 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept } return null; - } + }, + Execs.directExecutor() ) ).collect(Collectors.toList()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 0ec7b41089ff..07573f43c4f0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -247,7 +247,8 @@ public void onFailure(Throwable t) { submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId()))); } - } + }, + Execs.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java index d3b531a3a464..b5ff50dc6a5d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -201,7 +201,8 @@ public TaskStatus apply(TaskStatus taskStatus) throw Throwables.propagate(e); } } - } + }, + Execs.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java index 85f661095989..62bf274f3372 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.hrtr.WorkerHolder; import org.apache.druid.indexing.worker.WorkerHistoryItem; import org.apache.druid.indexing.worker.WorkerTaskMonitor; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; @@ -191,7 +192,8 @@ public void onFailure(Throwable th) log.debug(ex, "Request timed out or closed already."); } } - } + }, + Execs.directExecutor() ); asyncContext.setTimeout(timeout); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java index c9163611c614..871c71638168 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java @@ -229,7 +229,8 @@ public void onFailure(Throwable t) { runningItems.remove(taskRunnerWorkItem); } - } + }, + Execs.directExecutor() ); return statusFuture; diff --git a/pom.xml b/pom.xml index 668ba3e17702..e10b30b7f054 100644 --- a/pom.xml +++ b/pom.xml @@ -1388,6 +1388,19 @@ + + + recent-guava + + 27.0.1-jre + + + + hadoop-guava + + 11.0.2 + + diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 3f86efd18c67..e1cf317338d8 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; @@ -530,7 +531,8 @@ public void onFailure(Throwable t) } } } - } + }, + Execs.directExecutor() ); } catch (IOException e) { diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index be0811769f2c..d09e34f2a2b5 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -29,6 +29,7 @@ import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; import com.google.inject.Inject; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DiscoveryDruidNode; @@ -100,18 +101,14 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer * DataSources and not maintaining any segment information of other DataSources in memory. */ private final Predicate> defaultFilter; - private volatile Predicate> finalPredicate; - // For each queryable server, a name -> DruidServerHolder entry is kept private final ConcurrentHashMap servers = new ConcurrentHashMap<>(); - - private volatile ScheduledExecutorService executor; - private final HttpClient httpClient; private final ObjectMapper smileMapper; private final HttpServerInventoryViewConfig config; - private final CountDownLatch inventoryInitializationLatch = new CountDownLatch(1); + private volatile Predicate> finalPredicate; + private volatile ScheduledExecutorService executor; @Inject public HttpServerInventoryView( @@ -511,7 +508,12 @@ private class DruidServerHolder smileMapper, httpClient, executor, - new URL(druidServer.getScheme(), hostAndPort.getHostText(), hostAndPort.getPort(), "/"), + new URL( + druidServer.getScheme(), + GuavaUtils.getHostText(hostAndPort), + hostAndPort.getPort(), + "/" + ), "/druid-internal/v1/segments", SEGMENT_LIST_RESP_TYPE_REF, config.getServerTimeout(), diff --git a/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java b/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java index b9348731bdae..25f6070720d1 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java +++ b/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; /** @@ -30,13 +29,6 @@ */ public class QueryStatus { - public enum Status - { - RUNNING, - SUCCESS, - FAILED - } - private final String id; private final Status status; private final long duration; @@ -53,6 +45,16 @@ public QueryStatus( this.duration = duration; } + @Override + public String toString() + { + return "QueryStatus{" + + "id='" + id + '\'' + + ", status=" + status + + ", duration=" + duration + + '}'; + } + @JsonProperty("id") public String getId() { @@ -77,13 +79,10 @@ public boolean isComplete() return status != Status.RUNNING; } - @Override - public String toString() + public enum Status { - return Objects.toStringHelper(this) - .add("id", id) - .add("status", status) - .add("duration", duration) - .toString(); + RUNNING, + SUCCESS, + FAILED } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java b/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java index cb506876cd36..074c04595072 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java +++ b/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java @@ -21,10 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import org.apache.druid.indexer.TaskState; +import java.util.Objects; + /** * Should be synced with org.apache.druid.indexing.common.TaskStatus */ @@ -68,6 +69,16 @@ public long getDuration() return duration; } + @Override + public String toString() + { + return "TaskStatus{" + + "id='" + id + '\'' + + ", status=" + status + + ", duration=" + duration + + '}'; + } + @Override public boolean equals(Object o) { @@ -79,23 +90,13 @@ public boolean equals(Object o) } TaskStatus that = (TaskStatus) o; return duration == that.duration && - java.util.Objects.equals(id, that.id) && + id.equals(that.id) && status == that.status; } @Override public int hashCode() { - return java.util.Objects.hash(id, status, duration); - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("id", id) - .add("status", status) - .add("duration", duration) - .toString(); + return Objects.hash(id, status, duration); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 83be7dc24310..dee9b452caf6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -315,7 +315,8 @@ public void onFailure(Throwable t) { persistError = t; } - } + }, + Execs.directExecutor() ); } else { isPersistRequired = true; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java index ed0b8e43e7dc..8727406b54e3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -328,7 +328,8 @@ public void onFailure(Throwable e) // TODO: Retry? log.warn(e, "Failed to drop segment: %s", identifier); } - } + }, + Execs.directExecutor() ); } @@ -481,7 +482,8 @@ public void onFailure(Throwable e) log.warn(e, "Failed to push [%,d] segments.", segmentsToPush.size()); errorHandler.apply(e); } - } + }, + Execs.directExecutor() ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index a644e607fd3b..8b5550feaaf6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -522,7 +522,8 @@ ListenableFuture dropInBackground(SegmentsAndMetadata segme segmentsAndMetadata.getSegments(), metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() ); - } + }, + Execs.directExecutor() ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 2599387dd158..754cc5022c95 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -284,7 +284,8 @@ public ListenableFuture publish( sequenceNames.forEach(segments::remove); } return sam; - } + }, + Execs.directExecutor() ); } @@ -363,7 +364,8 @@ public void onFailure(Throwable e) numRemainingHandoffSegments.decrementAndGet(); resultFuture.setException(e); } - } + }, + Execs.directExecutor() ); } ); diff --git a/server/src/main/java/org/apache/druid/server/DruidNode.java b/server/src/main/java/org/apache/druid/server/DruidNode.java index 8454dc673948..827026a6e8e1 100644 --- a/server/src/main/java/org/apache/druid/server/DruidNode.java +++ b/server/src/main/java/org/apache/druid/server/DruidNode.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; import com.google.inject.name.Named; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.common.utils.SocketUtil; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -146,7 +147,7 @@ private void init(String serviceName, String host, boolean bindOnHost, Integer p Integer portFromHostConfig; if (host != null) { hostAndPort = HostAndPort.fromString(host); - host = hostAndPort.getHostText(); + host = GuavaUtils.getHostText(hostAndPort); portFromHostConfig = hostAndPort.hasPort() ? hostAndPort.getPort() : null; if (plainTextPort != null && portFromHostConfig != null && !plainTextPort.equals(portFromHostConfig)) { throw new IAE("Conflicting host:port [%s] and port [%d] settings", host, plainTextPort); diff --git a/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java b/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java index 428a51d14018..35fbe8cd2d87 100644 --- a/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java +++ b/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.StringUtils; public class HostAndPortWithScheme @@ -69,7 +70,7 @@ public String getScheme() public String getHostText() { - return hostAndPort.getHostText(); + return GuavaUtils.getHostText(hostAndPort); } public int getPort() diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java index 3a531b7d384d..91f2f482f0fd 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java @@ -29,6 +29,7 @@ import org.apache.druid.client.HttpServerInventoryView; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer; import org.apache.druid.server.coordination.ChangeRequestHistory; @@ -204,7 +205,8 @@ public void onFailure(Throwable th) log.debug(ex, "Request timed out or closed already."); } } - } + }, + Execs.directExecutor() ); asyncContext.setTimeout(timeout); @@ -315,7 +317,8 @@ public void onFailure(Throwable th) log.debug(ex, "Request timed out or closed already."); } } - } + }, + Execs.directExecutor() ); asyncContext.setTimeout(timeout); diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java index 0f90c83935c8..d443b25552ec 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java @@ -406,7 +406,8 @@ public void onFailure(Throwable t) LOG.makeAlert(t, "Background lookup manager exited with error!").emit(); } } - } + }, + Execs.directExecutor() ); LOG.debug("Started"); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 2eff321d78f0..add883bf775b 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -372,7 +372,8 @@ public void onFailure(Throwable t) { pair.lhs.setException(t); } - } + }, + Execs.directExecutor() ); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 79b575434bc9..144bfd48aa20 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -20,7 +20,6 @@ package org.apache.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -35,6 +34,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -491,7 +491,8 @@ public ListenableFuture push( .collect(Collectors.toList()); return Futures.transform( persistAll(committer), - (Function) commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata) + commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata), + Execs.directExecutor() ); } else { if (interruptPush) { diff --git a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java index d513aebb92d3..c747ea6a0e55 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.utils.CircularBuffer; import org.junit.Assert; import org.junit.Test; @@ -145,7 +146,8 @@ public void onFailure(Throwable t) { callbackExcecuted.set(true); } - } + }, + Execs.directExecutor() ); future.cancel(true); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 772c62886f4e..61def6c37b4b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -48,6 +48,7 @@ import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.RE; @@ -755,7 +756,7 @@ private static String extractHost(@Nullable final String hostAndPort) return null; } - return HostAndPort.fromString(hostAndPort).getHostText(); + return GuavaUtils.getHostText(HostAndPort.fromString(hostAndPort)); } private static int extractPort(@Nullable final String hostAndPort) From 3926f5bb83db97f8a4753da26701003aaac575a9 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 29 Jan 2019 13:16:53 -0800 Subject: [PATCH 2/9] Use `Runnable::run` instead of direct executor --- .../druid/java/util/common/concurrent/ListenableFutures.java | 4 ++-- .../druid/query/lookup/KafkaLookupExtractorFactory.java | 2 +- .../batch/parallel/SinglePhaseParallelIndexTaskRunner.java | 3 +-- .../org/apache/druid/indexing/overlord/RemoteTaskRunner.java | 4 ++-- .../java/org/apache/druid/indexing/overlord/TaskQueue.java | 3 +-- .../druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java | 2 +- .../seekablestream/SeekableStreamIndexTaskRunner.java | 4 ++-- .../org/apache/druid/indexing/worker/WorkerTaskManager.java | 2 +- .../druid/indexing/worker/http/TaskManagementResource.java | 3 +-- .../org/apache/druid/indexing/overlord/TestTaskRunner.java | 2 +- .../main/java/org/apache/druid/client/DirectDruidClient.java | 3 +-- .../apache/druid/client/cache/BackgroundCachePopulator.java | 3 +-- .../segment/realtime/appenderator/AppenderatorImpl.java | 2 +- .../segment/realtime/appenderator/AppenderatorPlumber.java | 4 ++-- .../realtime/appenderator/StreamAppenderatorDriver.java | 2 +- .../org/apache/druid/server/http/SegmentListerResource.java | 5 ++--- .../druid/server/lookup/cache/LookupCoordinatorManager.java | 2 +- .../org/apache/druid/client/CachingClusteredClientTest.java | 2 +- .../druid/server/coordination/ChangeRequestHistoryTest.java | 3 +-- 19 files changed, 24 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java index a7e1f5067ef5..73f1ea6975f2 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java +++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java @@ -66,7 +66,7 @@ public void onFailure(Throwable t) finalFuture.setException(t); } }, - Execs.directExecutor() + Runnable::run ); } @@ -76,7 +76,7 @@ public void onFailure(Throwable t) finalFuture.setException(t); } }, - Execs.directExecutor() + Runnable::run ); return finalFuture; } diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java index e65c7a382184..1b421e73de23 100644 --- a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java +++ b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java @@ -261,7 +261,7 @@ public void onFailure(Throwable t) } } }, - Execs.directExecutor() + Runnable::run ); this.future = future; final Stopwatch stopwatch = Stopwatch.createStarted(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index df557b5a23b3..30181e61f635 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -36,7 +36,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.MonitorEntry; import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; @@ -439,7 +438,7 @@ public void onFailure(Throwable t) taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t)); } }, - Execs.directExecutor() + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index e39fc58b8e6c..4efb0a80434d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -265,7 +265,7 @@ public void onFailure(Throwable throwable) } } }, - Execs.directExecutor() + Runnable::run ); break; case CHILD_UPDATED: @@ -1177,7 +1177,7 @@ public void onFailure(Throwable t) removedWorkerCleanups.remove(worker, cleanupTask); } }, - Execs.directExecutor() + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 9351e18b99c9..5c23ab131393 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -537,7 +536,7 @@ private void handleStatus(final TaskStatus status) } } }, - Execs.directExecutor() + Runnable::run ); return statusFuture; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 71a0f3bca86b..43a96f2a7bb0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -624,7 +624,7 @@ public void onFailure(Throwable t) removedWorkerCleanups.remove(workerHostAndPort, cleanupTask); } }, - Execs.directExecutor() + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 18de9f44e9d9..0c95f9b47bf3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -622,7 +622,7 @@ public void onFailure(Throwable t) backgroundThreadException = t; } }, - Execs.directExecutor() + Runnable::run ); } } @@ -939,7 +939,7 @@ public void onFailure(Throwable t) handoffFuture.setException(t); } }, - Execs.directExecutor() + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 07573f43c4f0..77e4ae81ce7c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -248,7 +248,7 @@ public void onFailure(Throwable t) submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId()))); } }, - Execs.directExecutor() + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java index 62bf274f3372..62e1392e249d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java @@ -32,7 +32,6 @@ import org.apache.druid.indexing.overlord.hrtr.WorkerHolder; import org.apache.druid.indexing.worker.WorkerHistoryItem; import org.apache.druid.indexing.worker.WorkerTaskMonitor; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; @@ -193,7 +192,7 @@ public void onFailure(Throwable th) } } }, - Execs.directExecutor() + Runnable::run ); asyncContext.setTimeout(timeout); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java index 871c71638168..bd75dfb64ad3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java @@ -230,7 +230,7 @@ public void onFailure(Throwable t) runningItems.remove(taskRunnerWorkItem); } }, - Execs.directExecutor() + Runnable::run ); return statusFuture; diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index e1cf317338d8..ca57540b4dd5 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; @@ -532,7 +531,7 @@ public void onFailure(Throwable t) } } }, - Execs.directExecutor() + Runnable::run ); } catch (IOException e) { diff --git a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java index 0e78e994fd04..b6888fe66f56 100644 --- a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java +++ b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; @@ -102,7 +101,7 @@ public void onFailure(Throwable t) exec ); }, - Execs.directExecutor() + Runnable::run ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index dee9b452caf6..4cd0194a8bad 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -316,7 +316,7 @@ public void onFailure(Throwable t) persistError = t; } }, - Execs.directExecutor() + Runnable::run ); } else { isPersistRequired = true; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java index 8727406b54e3..b3a63a85da78 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -329,7 +329,7 @@ public void onFailure(Throwable e) log.warn(e, "Failed to drop segment: %s", identifier); } }, - Execs.directExecutor() + Runnable::run ); } @@ -483,7 +483,7 @@ public void onFailure(Throwable e) errorHandler.apply(e); } }, - Execs.directExecutor() + Runnable::run ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 754cc5022c95..7c3f0dd0eb2f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -365,7 +365,7 @@ public void onFailure(Throwable e) resultFuture.setException(e); } }, - Execs.directExecutor() + Runnable::run ); } ); diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java index 91f2f482f0fd..619d7703a844 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java @@ -29,7 +29,6 @@ import org.apache.druid.client.HttpServerInventoryView; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Smile; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer; import org.apache.druid.server.coordination.ChangeRequestHistory; @@ -206,7 +205,7 @@ public void onFailure(Throwable th) } } }, - Execs.directExecutor() + Runnable::run ); asyncContext.setTimeout(timeout); @@ -318,7 +317,7 @@ public void onFailure(Throwable th) } } }, - Execs.directExecutor() + Runnable::run ); asyncContext.setTimeout(timeout); diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java index d443b25552ec..da04e6efac48 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java @@ -407,7 +407,7 @@ public void onFailure(Throwable t) } } }, - Execs.directExecutor() + Runnable::run ); LOG.debug("Started"); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index add883bf775b..49c38128e3d0 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -373,7 +373,7 @@ public void onFailure(Throwable t) pair.lhs.setException(t); } }, - Execs.directExecutor() + Runnable::run ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java index c747ea6a0e55..082007591b59 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.utils.CircularBuffer; import org.junit.Assert; import org.junit.Test; @@ -147,7 +146,7 @@ public void onFailure(Throwable t) callbackExcecuted.set(true); } }, - Execs.directExecutor() + Runnable::run ); future.cancel(true); From cf864267e20c5ee4544247ac74ac188b36c9a454 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 29 Jan 2019 13:25:37 -0800 Subject: [PATCH 3/9] Clean up a test compile --- .../StreamAppenderatorDriverFailTest.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 144bfd48aa20..8ba5979bbf39 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; @@ -34,7 +35,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ListenableFutures; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -67,6 +68,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.stream.Collectors; public class StreamAppenderatorDriverFailTest extends EasyMockSupport @@ -489,10 +491,13 @@ public ListenableFuture push( ) ) .collect(Collectors.toList()); - return Futures.transform( + return ListenableFutures.transformAsync( persistAll(committer), - commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata), - Execs.directExecutor() + input -> { + final SettableFuture future = SettableFuture.create(); + future.set(new SegmentsAndMetadata(segments, input)); + return future; + } ); } else { if (interruptPush) { From 9c6d33bc0e6c141193ef3934a1ca2f8b9d4f444d Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 29 Jan 2019 13:33:51 -0800 Subject: [PATCH 4/9] Running mvn check helps --- .../realtime/appenderator/StreamAppenderatorDriverFailTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 8ba5979bbf39..dde7af8d34c1 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -68,7 +68,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Function; import java.util.stream.Collectors; public class StreamAppenderatorDriverFailTest extends EasyMockSupport From b04ef2e8ccac5668bb64ff75d201fa502c52a027 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 29 Jan 2019 13:40:31 -0800 Subject: [PATCH 5/9] Remove more diret executor calls --- .../seekablestream/SeekableStreamIndexTaskRunner.java | 5 ++--- .../supervisor/SeekableStreamSupervisor.java | 10 +++++----- .../indexing/worker/executor/ExecutorLifecycle.java | 2 +- .../realtime/appenderator/BaseAppenderatorDriver.java | 2 +- .../appenderator/StreamAppenderatorDriver.java | 2 +- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 0c95f9b47bf3..61439870f533 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -66,7 +66,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.collect.Utils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.indexing.RealtimeIOConfig; @@ -878,7 +877,7 @@ private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) return publishedSegmentsAndMetadata; } }, - Execs.directExecutor() + Runnable::run ); publishWaitList.add(publishFuture); @@ -928,7 +927,7 @@ public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata) return null; } }, - Execs.directExecutor() + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 17906e42dba0..d81520b6ddbf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -905,7 +905,7 @@ private Map> getCurrentTotalStats() taskId, currentStats ), - Execs.directExecutor() + Runnable::run ) ); groupAndTaskIds.add(new Pair<>(groupId, taskId)); @@ -924,7 +924,7 @@ private Map> getCurrentTotalStats() taskId, currentStats ), - Execs.directExecutor() + Runnable::run ) ); groupAndTaskIds.add(new Pair<>(groupId, taskId)); @@ -1628,7 +1628,7 @@ public Void apply(@Nullable Boolean result) return null; } }, - Execs.directExecutor() + Runnable::run ); } @@ -1912,7 +1912,7 @@ public Map apply(@Nullable Object input) return null; } }, - Execs.directExecutor() + Runnable::run ); } @@ -2541,7 +2541,7 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept return null; }, - Execs.directExecutor() + Runnable::run ) ).collect(Collectors.toList()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java index b5ff50dc6a5d..441f6dc3b8c8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -202,7 +202,7 @@ public TaskStatus apply(TaskStatus taskStatus) } } }, - Execs.directExecutor() + Runnable::run ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 8b5550feaaf6..29745d76686d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -523,7 +523,7 @@ ListenableFuture dropInBackground(SegmentsAndMetadata segme metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() ); }, - Execs.directExecutor() + Runnable::run ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 7c3f0dd0eb2f..8a575adfd9d3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -285,7 +285,7 @@ public ListenableFuture publish( } return sam; }, - Execs.directExecutor() + Runnable::run ); } From c1982ad97dd09242b3596a768deaccfed1e510ca Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 29 Jan 2019 13:46:27 -0800 Subject: [PATCH 6/9] Retain some explicit casts --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index d81520b6ddbf..77530eae526f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -900,7 +900,8 @@ private Map> getCurrentTotalStats() futures.add( Futures.transform( taskClient.getMovingAveragesAsync(taskId), - (currentStats) -> new StatsFromTaskResult( + // Ambiguous call if not explicitly declared + (Function, StatsFromTaskResult>) (currentStats) -> new StatsFromTaskResult( groupId, taskId, currentStats @@ -919,7 +920,8 @@ private Map> getCurrentTotalStats() futures.add( Futures.transform( taskClient.getMovingAveragesAsync(taskId), - (currentStats) -> new StatsFromTaskResult( + // Ambiguous call if not explicitly declared + (Function, StatsFromTaskResult>) (currentStats) -> new StatsFromTaskResult( groupId, taskId, currentStats From 0fd0c55528b49c640b8aa975a351e0ca5131bd28 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 29 Jan 2019 14:23:17 -0800 Subject: [PATCH 7/9] Pass findbugs --- .../druid/collections/spatial/ImmutableRTreeTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java b/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java index cb02683ed8c6..7d0fa4fc0ecd 100644 --- a/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java +++ b/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Bytes; -import junit.framework.Assert; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.collections.bitmap.ConciseBitmapFactory; import org.apache.druid.collections.bitmap.ImmutableBitmap; @@ -35,6 +34,7 @@ import org.apache.druid.collections.spatial.split.LinearGutmanSplitStrategy; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.ImmutableRTreeObjectStrategy; +import org.junit.Assert; import org.junit.Test; import org.roaringbitmap.IntIterator; @@ -581,7 +581,7 @@ public void showBenchmarks() Iterable points = searchTree.search(new RadiusBound(new float[]{50, 50}, radius)); - Iterables.size(points); + Assert.assertNotNull(Iterables.size(points)); stop = stopwatch.elapsed(TimeUnit.MILLISECONDS); System.out.printf(Locale.ENGLISH, "[%,d]: search = %,dms%n", numPoints, stop); @@ -637,7 +637,7 @@ public void showBenchmarksBoundWithLimits() ) ); - Iterables.size(points); + Assert.assertNotNull(Iterables.size(points)); stop = stopwatch.elapsed(TimeUnit.MILLISECONDS); System.out.printf(Locale.ENGLISH, "[%,d]: search = %,dms%n", numPoints, stop); @@ -672,6 +672,6 @@ public void testToBytes() ImmutableRTree deserializedTree = genericIndexed.get(0); byte[] bytes2 = deserializedTree.toBytes(); - org.junit.Assert.assertEquals(Bytes.asList(bytes1), Bytes.asList(bytes2)); + Assert.assertEquals(Bytes.asList(bytes1), Bytes.asList(bytes2)); } } From 64698eef45ee8999733c86c948fd3ab4eb559033 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 29 Jan 2019 14:51:03 -0800 Subject: [PATCH 8/9] Fix some FindBugs complaints --- .../seekablestream/SeekableStreamIndexTaskRunner.java | 1 + .../druid/collections/spatial/ImmutableRTreeTest.java | 8 ++++---- .../segment/realtime/appenderator/AppenderatorImpl.java | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 61439870f533..9d1704b9c6fa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -889,6 +889,7 @@ private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) publishFuture, new FutureCallback() { + @SuppressWarnings("CheckReturnValue") @Override public void onSuccess(SegmentsAndMetadata publishedSegmentsAndMetadata) { diff --git a/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java b/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java index 7d0fa4fc0ecd..c9af7a7431d9 100644 --- a/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java +++ b/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java @@ -550,7 +550,7 @@ public void testSearchWithSplitLimitedBoundRoaring() } } - @SuppressWarnings("unused") // TODO rewrite to JMH and move to the benchmarks project + @SuppressWarnings({"unused", "CheckReturnValue"}) // TODO rewrite to JMH and move to the benchmarks project public void showBenchmarks() { final int start = 1; @@ -581,7 +581,7 @@ public void showBenchmarks() Iterable points = searchTree.search(new RadiusBound(new float[]{50, 50}, radius)); - Assert.assertNotNull(Iterables.size(points)); + Iterables.size(points); stop = stopwatch.elapsed(TimeUnit.MILLISECONDS); System.out.printf(Locale.ENGLISH, "[%,d]: search = %,dms%n", numPoints, stop); @@ -599,7 +599,7 @@ public void showBenchmarks() } } - @SuppressWarnings("unused") // TODO rewrite to JMH and move to the benchmarks project + @SuppressWarnings({"unused", "CheckReturnValue"}) // TODO rewrite to JMH and move to the benchmarks project public void showBenchmarksBoundWithLimits() { //final int start = 1; @@ -637,7 +637,7 @@ public void showBenchmarksBoundWithLimits() ) ); - Assert.assertNotNull(Iterables.size(points)); + Iterables.size(points); stop = stopwatch.elapsed(TimeUnit.MILLISECONDS); System.out.printf(Locale.ENGLISH, "[%,d]: search = %,dms%n", numPoints, stop); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 4cd0194a8bad..6945fdf2e316 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -641,6 +641,7 @@ public ListenableFuture push( * This is useful if we're going to do something that would otherwise potentially break currently in-progress * pushes. */ + @SuppressWarnings("CheckReturnValue") private ListenableFuture pushBarrier() { return intermediateTempExecutor.submit( From 5ea6c47b2e219ef94a10143a6ffc36febcb27aba Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 12 Feb 2019 09:27:49 -0800 Subject: [PATCH 9/9] Add references to commits where changes were made --- .../java/org/apache/druid/common/guava/GuavaUtils.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java index 9af134e00d0c..76283d38c6e0 100644 --- a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java +++ b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java @@ -35,13 +35,17 @@ import java.util.Arrays; /** - * + * This class contains a bunch of guava helper functions to bridge compatability problems across guava versions, or + * between guava and java standard libraries. Any version compatability related entries should have a reference to + * the critical commit(s). */ public class GuavaUtils { private static final CharMatcher BREAKING_WHITESPACE_INSTANCE; static { + // https://github.com/google/guava/commit/4fbb165ebf208d75100d5d47f56750d247f7d181 + // Since v19.0 CharMatcher matcher; try { final Method m = CharMatcher.class.getDeclaredMethod("breakingWhitespace"); @@ -151,6 +155,8 @@ private static T tryMethods(Object object, Class assignableTo, String... */ public static String getHostText(HostAndPort hostAndPort) { + // https://github.com/google/guava/commit/b0babb69b05ed4d15cce74635ae96cf8ba78c85f + // Change started in v20.0 return tryMethods(hostAndPort, String.class, "getHostText", "getHost"); }