diff --git a/.travis.yml b/.travis.yml index 3a871a3dca7b..f05db3da9d62 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,12 +41,12 @@ matrix: install: true script: MAVEN_OPTS='-Xmx3000m' mvn clean verify -Prat -DskipTests -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Drat.consoleOutput=true - # strict compilation + # strict compilation and future guava compat - env: - - NAME="strict compilation" + - NAME="strict compilation and guava checks" install: true # Strict compilation requires more than 2 GB - script: MAVEN_OPTS='-Xmx3000m' mvn clean -Pstrict -pl '!benchmarks' compile test-compile -B --fail-at-end + script: MAVEN_OPTS='-Xmx3000m' mvn clean -Pstrict -Precent-guava -pl '!benchmarks' compile test-compile -B --fail-at-end # processing module test - env: diff --git a/codestyle/druid-forbidden-apis.txt b/codestyle/druid-forbidden-apis.txt index 5eb5256a2680..9d47a05bde95 100644 --- a/codestyle/druid-forbidden-apis.txt +++ b/codestyle/druid-forbidden-apis.txt @@ -34,7 +34,12 @@ java.util.Random#() @ Use ThreadLocalRandom.current() or the constructor w java.lang.Math#random() @ Use ThreadLocalRandom.current() java.util.regex.Pattern#matches(java.lang.String,java.lang.CharSequence) @ Use String.startsWith(), endsWith(), contains(), or compile and cache a Pattern explicitly org.apache.commons.io.FileUtils#getTempDirectory() @ Use org.junit.rules.TemporaryFolder for tests instead - +com.google.common.net.HostAndPort#getHostText() @ Use org.apache.druid.common.guava.GuavaUtils#getHostText instead +com.google.common.base.CharMatcher.BREAKING_WHITESPACE @ Use org.apache.druid.common.guava.GuavaUtils#breakingWhitespace() +com.google.common.base.CharMatcher#breakingWhitespace() @ Use org.apache.druid.common.guava.GuavaUtils#breakingWhitespace() +com.google.common.base.Objects#toStringHelper(java.lang.Object) @ Deprecated in future guava +com.google.common.base.Objects#toStringHelper(java.lang.Class) @ Deprecated in future guava +com.google.common.base.Objects#toStringHelper(java.lang.String) @ Deprecated in future guava @defaultMessage For performance reasons, use the utf8Base64 / encodeBase64 / encodeBase64String / decodeBase64 / decodeBase64String methods in StringUtils org.apache.commons.codec.binary.Base64 com.google.common.io.BaseEncoding.base64 \ No newline at end of file diff --git a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java index 47d996033290..76283d38c6e0 100644 --- a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java +++ b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java @@ -19,16 +19,58 @@ package org.apache.druid.common.guava; +import com.google.common.base.CharMatcher; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.net.HostAndPort; import com.google.common.primitives.Longs; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; +import javax.el.MethodNotFoundException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; /** + * This class contains a bunch of guava helper functions to bridge compatability problems across guava versions, or + * between guava and java standard libraries. Any version compatability related entries should have a reference to + * the critical commit(s). */ public class GuavaUtils { + private static final CharMatcher BREAKING_WHITESPACE_INSTANCE; + + static { + // https://github.com/google/guava/commit/4fbb165ebf208d75100d5d47f56750d247f7d181 + // Since v19.0 + CharMatcher matcher; + try { + final Method m = CharMatcher.class.getDeclaredMethod("breakingWhitespace"); + matcher = (CharMatcher) m.invoke(null); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException("Failed to fetch breakingWhitespace", e); + } + catch (NoSuchMethodException e) { + try { + final Field f = CharMatcher.class.getDeclaredField("BREAKING_WHITESPACE"); + matcher = (CharMatcher) f.get(null); + } + catch (IllegalAccessException e1) { + throw new IllegalStateException("Failed to access BREAKING_WHITESPACE", e1); + } + catch (NoSuchFieldException e1) { + throw new IllegalStateException("Cannot find breaking white space in guava", e1); + } + } + if (matcher == null) { + throw new IllegalStateException("wtf!?"); + } + BREAKING_WHITESPACE_INSTANCE = matcher; + } /** * To fix semantic difference of Longs.tryParse() from Long.parseLong (Longs.tryParse() returns null for '+' started value) @@ -62,4 +104,72 @@ public static > T getEnumIfPresent(final Class enumClass, f return null; } + + /** + * Try the various methods (zero arguments) against the object. This is handy for maintaining guava compatability + * + * @param object The object to call the methods on + * @param methods The sequence of methods to call + * @param The return type + * + * @return The result of invoking the method on the object + */ + private static T tryMethods(Object object, Class assignableTo, String... methods) + { + for (String method : methods) { + try { + final Method m = object.getClass().getDeclaredMethod(method); + if (!assignableTo.isAssignableFrom(m.getReturnType())) { + throw new IAE( + "Cannot assign [%s] to [%s] in [%s] from [%s]", + m.getReturnType(), + assignableTo, + m, + object.getClass() + ); + } + try { + return (T) m.invoke(object); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new IAE("Failed to invoke [%s] on [%s]", m, object); + } + } + catch (NoSuchMethodException e) { + // Keep going + } + } + throw new MethodNotFoundException(StringUtils.format( + "Unable to find methods %s in [%s]", + Arrays.toString(methods), + object.getClass() + )); + } + + /** + * Get the host portion of the {@link HostAndPort} that has the host's text. Changes in different guava versions + * + * @param hostAndPort The object to pull from + * + * @return The host portion of the host and port + */ + public static String getHostText(HostAndPort hostAndPort) + { + // https://github.com/google/guava/commit/b0babb69b05ed4d15cce74635ae96cf8ba78c85f + // Change started in v20.0 + return tryMethods(hostAndPort, String.class, "getHostText", "getHost"); + } + + /** + * Returns the instance of the breaking whitespace char matcher in guava. + * + * This moved starting in guava 19 + * + * @return `CharMatcher.BREAKING_WHITESPACE` or `CharMatcher.breakingWhitespace()` depending on whichever works + */ + + public static CharMatcher breakingWhitespace() + { + return BREAKING_WHITESPACE_INSTANCE; + } } diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java index 6b90fef06171..a8186f51c1ab 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java @@ -22,9 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import java.util.Objects; + /** * Represents the status of a task from the perspective of the coordinator. The task may be ongoing * ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true). @@ -178,12 +179,12 @@ public TaskStatus withDuration(long _duration) @Override public String toString() { - return Objects.toStringHelper(this) - .add("id", id) - .add("status", status) - .add("duration", duration) - .add("errorMsg", errorMsg) - .toString(); + return "TaskStatus{" + + "id='" + id + '\'' + + ", status=" + status + + ", duration=" + duration + + ", errorMsg='" + errorMsg + '\'' + + '}'; } @Override @@ -196,15 +197,15 @@ public boolean equals(Object o) return false; } TaskStatus that = (TaskStatus) o; - return getDuration() == that.getDuration() && - java.util.Objects.equals(getId(), that.getId()) && + return duration == that.duration && + id.equals(that.id) && status == that.status && - java.util.Objects.equals(getErrorMsg(), that.getErrorMsg()); + Objects.equals(errorMsg, that.errorMsg); } @Override public int hashCode() { - return java.util.Objects.hash(getId(), status, getDuration(), getErrorMsg()); + return Objects.hash(id, status, duration, errorMsg); } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java index 1722e4a9c663..73f1ea6975f2 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java +++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java @@ -42,18 +42,32 @@ public static ListenableFuture transformAsync( ) { final SettableFuture finalFuture = SettableFuture.create(); - Futures.addCallback(inFuture, new FutureCallback() - { - @Override - public void onSuccess(@Nullable I result) - { - final ListenableFuture transformFuture = transform.apply(result); - Futures.addCallback(transformFuture, new FutureCallback() + Futures.addCallback( + inFuture, + new FutureCallback() { @Override - public void onSuccess(@Nullable O result) + public void onSuccess(@Nullable I result) { - finalFuture.set(result); + final ListenableFuture transformFuture = transform.apply(result); + Futures.addCallback( + transformFuture, + new FutureCallback() + { + @Override + public void onSuccess(@Nullable O result) + { + finalFuture.set(result); + } + + @Override + public void onFailure(Throwable t) + { + finalFuture.setException(t); + } + }, + Runnable::run + ); } @Override @@ -61,15 +75,9 @@ public void onFailure(Throwable t) { finalFuture.setException(t); } - }); - } - - @Override - public void onFailure(Throwable t) - { - finalFuture.setException(t); - } - }); + }, + Runnable::run + ); return finalFuture; } } diff --git a/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java b/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java index 6bd764f0a5de..c043e7b46622 100644 --- a/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java +++ b/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java @@ -19,19 +19,13 @@ package org.apache.druid.common.guava; +import com.google.common.net.HostAndPort; import com.google.common.primitives.Longs; import org.junit.Assert; import org.junit.Test; public class GuavaUtilsTest { - enum MyEnum - { - ONE, - TWO, - BUCKLE_MY_SHOE - } - @Test public void testParseLong() { @@ -53,4 +47,25 @@ public void testGetEnumIfPresent() Assert.assertEquals(MyEnum.BUCKLE_MY_SHOE, GuavaUtils.getEnumIfPresent(MyEnum.class, "BUCKLE_MY_SHOE")); Assert.assertEquals(null, GuavaUtils.getEnumIfPresent(MyEnum.class, "buckle_my_shoe")); } + + @Test + public void testHostAndPorthostText() + { + Assert.assertEquals("localhost", GuavaUtils.getHostText(HostAndPort.fromString("localhost"))); + Assert.assertEquals("127.0.0.1", GuavaUtils.getHostText(HostAndPort.fromString("127.0.0.1"))); + Assert.assertEquals("::1", GuavaUtils.getHostText(HostAndPort.fromString("::1"))); + } + + @Test + public void testBreakingWhitespaceExists() + { + Assert.assertNotNull(GuavaUtils.breakingWhitespace()); + } + + enum MyEnum + { + ONE, + TWO, + BUCKLE_MY_SHOE + } } diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java index 038fb2db90f4..083dbbd7e172 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java @@ -38,6 +38,7 @@ import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.common.logger.Logger; @@ -62,31 +63,29 @@ public class KafkaSimpleConsumer public static final List EMPTY_MSGS = new ArrayList<>(); private static final Logger log = new Logger(KafkaSimpleConsumer.class); - + private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30); + private static final int BUFFER_SIZE = 65536; + private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1); + private static final int FETCH_SIZE = 100_000_000; private final List allBrokers; private final String topic; private final int partitionId; private final String clientId; private final String leaderLookupClientId; private final boolean earliest; - private volatile Broker leaderBroker; private List replicaBrokers; private SimpleConsumer consumer = null; - private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30); - private static final int BUFFER_SIZE = 65536; - private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1); - private static final int FETCH_SIZE = 100_000_000; - public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List brokers, boolean earliest) { List brokerList = new ArrayList<>(); for (String broker : brokers) { HostAndPort brokerHostAndPort = HostAndPort.fromString(broker); + final String hostText = GuavaUtils.getHostText(brokerHostAndPort); Preconditions.checkArgument( - brokerHostAndPort.getHostText() != null && - !brokerHostAndPort.getHostText().isEmpty() && + hostText != null && + !hostText.isEmpty() && brokerHostAndPort.hasPort(), "kafka broker [%s] is not valid, must be :", broker @@ -126,35 +125,6 @@ private void ensureConsumer(Broker leader) throws InterruptedException } } - public static class BytesMessageWithOffset - { - final byte[] msg; - final long offset; - final int partition; - - public BytesMessageWithOffset(byte[] msg, long offset, int partition) - { - this.msg = msg; - this.offset = offset; - this.partition = partition; - } - - public int getPartition() - { - return partition; - } - - public byte[] message() - { - return msg; - } - - public long offset() - { - return offset; - } - } - private Iterable filterAndDecode(Iterable kafkaMessages, final long offset) { return FunctionalIterable @@ -304,7 +274,13 @@ private PartitionMetadata findLeader() throws InterruptedException SimpleConsumer consumer = null; try { log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString()); - consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, leaderLookupClientId); + consumer = new SimpleConsumer( + GuavaUtils.getHostText(broker), + broker.getPort(), + SO_TIMEOUT_MILLIS, + BUFFER_SIZE, + leaderLookupClientId + ); TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic))); List metaData = resp.topicsMetadata(); @@ -388,4 +364,33 @@ private void ensureNotInterrupted(Exception e) throws InterruptedException throw new InterruptedException(); } } + + public static class BytesMessageWithOffset + { + final byte[] msg; + final long offset; + final int partition; + + public BytesMessageWithOffset(byte[] msg, long offset, int partition) + { + this.msg = msg; + this.offset = offset; + this.partition = partition; + } + + public int getPartition() + { + return partition; + } + + public byte[] message() + { + return msg; + } + + public long offset() + { + return offset; + } + } } diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java index 01b3dd2aff49..14201a2d5aa1 100644 --- a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java +++ b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java @@ -262,7 +262,7 @@ public void onFailure(Throwable t) } } }, - Execs.directExecutor() + Runnable::run ); this.future = future; final Stopwatch stopwatch = Stopwatch.createStarted(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java index 865b053b95c4..dbc67334c9c5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java @@ -172,14 +172,14 @@ public int hashCode() @Override public String toString() { - return Objects.toStringHelper(this) - .add("type", type) - .add("groupId", groupId) - .add("dataSource", dataSource) - .add("interval", interval) - .add("version", version) - .add("priority", priority) - .add("revoked", revoked) - .toString(); + return "TaskLock{" + + "type=" + type + + ", groupId='" + groupId + '\'' + + ", dataSource='" + dataSource + '\'' + + ", interval=" + interval + + ", version='" + version + '\'' + + ", priority=" + priority + + ", revoked=" + revoked + + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java index 2718be043863..ccef1f0de4d3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; @@ -61,6 +60,7 @@ import java.util.stream.Collectors; /** + * */ public abstract class MergeTaskBase extends AbstractFixedIntervalTask { @@ -114,6 +114,80 @@ public boolean apply(@Nullable DataSegment segment) this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; } + private static String computeProcessingID(final String dataSource, final List segments) + { + final String segmentIDs = Joiner.on("_").join( + Iterables.transform( + Ordering.natural().sortedCopy(segments), new Function() + { + @Override + public String apply(DataSegment x) + { + return StringUtils.format( + "%s_%s_%s_%s", + x.getInterval().getStart(), + x.getInterval().getEnd(), + x.getVersion(), + x.getShardSpec().getPartitionNum() + ); + } + } + ) + ); + + return StringUtils.format( + "%s_%s", + dataSource, + Hashing.sha1().hashString(segmentIDs, StandardCharsets.UTF_8).toString() + ); + } + + private static Interval computeMergedInterval(final List segments) + { + Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0"); + + DateTime start = null; + DateTime end = null; + + for (final DataSegment segment : segments) { + if (start == null || segment.getInterval().getStart().isBefore(start)) { + start = segment.getInterval().getStart(); + } + + if (end == null || segment.getInterval().getEnd().isAfter(end)) { + end = segment.getInterval().getEnd(); + } + } + + return new Interval(start, end); + } + + private static DataSegment computeMergedSegment( + final String dataSource, + final String version, + final List segments + ) + { + final Interval mergedInterval = computeMergedInterval(segments); + final Set mergedDimensions = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + final Set mergedMetrics = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + + for (DataSegment segment : segments) { + mergedDimensions.addAll(segment.getDimensions()); + mergedMetrics.addAll(segment.getMetrics()); + } + + return DataSegment.builder() + .dataSource(dataSource) + .interval(mergedInterval) + .version(version) + .binaryVersion(IndexIO.CURRENT_VERSION_ID) + .shardSpec(NoneShardSpec.instance()) + .dimensions(Lists.newArrayList(mergedDimensions)) + .metrics(Lists.newArrayList(mergedMetrics)) + .build(); + } + protected void verifyInputSegments(List segments) { // Verify segments are all unsharded @@ -251,86 +325,9 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() @Override public String toString() { - return Objects.toStringHelper(this) - .add("id", getId()) - .add("dataSource", getDataSource()) - .add("interval", getInterval()) - .add("segments", segments) - .add("segmentWriteOutMediumFactory", segmentWriteOutMediumFactory) - .toString(); - } - - private static String computeProcessingID(final String dataSource, final List segments) - { - final String segmentIDs = Joiner.on("_").join( - Iterables.transform( - Ordering.natural().sortedCopy(segments), new Function() - { - @Override - public String apply(DataSegment x) - { - return StringUtils.format( - "%s_%s_%s_%s", - x.getInterval().getStart(), - x.getInterval().getEnd(), - x.getVersion(), - x.getShardSpec().getPartitionNum() - ); - } - } - ) - ); - - return StringUtils.format( - "%s_%s", - dataSource, - Hashing.sha1().hashString(segmentIDs, StandardCharsets.UTF_8).toString() - ); - } - - private static Interval computeMergedInterval(final List segments) - { - Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0"); - - DateTime start = null; - DateTime end = null; - - for (final DataSegment segment : segments) { - if (start == null || segment.getInterval().getStart().isBefore(start)) { - start = segment.getInterval().getStart(); - } - - if (end == null || segment.getInterval().getEnd().isAfter(end)) { - end = segment.getInterval().getEnd(); - } - } - - return new Interval(start, end); - } - - private static DataSegment computeMergedSegment( - final String dataSource, - final String version, - final List segments - ) - { - final Interval mergedInterval = computeMergedInterval(segments); - final Set mergedDimensions = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); - final Set mergedMetrics = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); - - for (DataSegment segment : segments) { - mergedDimensions.addAll(segment.getDimensions()); - mergedMetrics.addAll(segment.getMetrics()); - } - - return DataSegment.builder() - .dataSource(dataSource) - .interval(mergedInterval) - .version(version) - .binaryVersion(IndexIO.CURRENT_VERSION_ID) - .shardSpec(NoneShardSpec.instance()) - .dimensions(Lists.newArrayList(mergedDimensions)) - .metrics(Lists.newArrayList(mergedMetrics)) - .build(); + return "MergeTaskBase{" + + "segments=" + segments + + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index e1919ec56099..de7544390e77 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -436,7 +436,8 @@ public void onFailure(Throwable t) log.error(t, "Error while running a task for subTaskSpec[%s]", spec); taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t)); } - } + }, + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index d3199834c635..a1effc4a9a01 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -42,6 +42,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import org.apache.commons.io.FileUtils; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; @@ -841,7 +842,7 @@ public boolean matches(char c) if (inQuotes) { return false; } - return CharMatcher.BREAKING_WHITESPACE.matches(c); + return GuavaUtils.breakingWhitespace().matches(c); } } ).omitEmptyStrings().split(string).iterator(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index b83ea673c9f3..4efb0a80434d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -264,7 +264,8 @@ public void onFailure(Throwable throwable) waitingForMonitor.notifyAll(); } } - } + }, + Runnable::run ); break; case CHILD_UPDATED: @@ -1175,7 +1176,8 @@ public void onFailure(Throwable t) { removedWorkerCleanups.remove(worker, cleanupTask); } - } + }, + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 787852ddfbd5..c14f0425723b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -196,7 +196,7 @@ public int compare(Pair left, Pair right) * @param interval interval to lock * * @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a - * {@link LockResult#revoked} flag. + * {@link LockResult#isRevoked} flag. * * @throws InterruptedException if the current thread is interrupted */ @@ -231,7 +231,7 @@ public LockResult lock( * @param timeoutMs maximum time to wait * * @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a - * {@link LockResult#revoked} flag. + * {@link LockResult#isRevoked} flag. * * @throws InterruptedException if the current thread is interrupted */ @@ -268,7 +268,7 @@ public LockResult lock( * @param interval interval to lock * * @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a - * {@link LockResult#revoked} flag. + * {@link LockResult#isRevoked} flag. * * @throws IllegalStateException if the task is not a valid active task */ @@ -1091,10 +1091,10 @@ public int hashCode() @Override public String toString() { - return Objects.toStringHelper(this) - .add("taskLock", taskLock) - .add("taskIds", taskIds) - .toString(); + return "TaskLockPosse{" + + "taskLock=" + taskLock + + ", taskIds=" + taskIds + + '}'; } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index afcbdb1f5ada..fd7824f03981 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -534,7 +534,8 @@ private void handleStatus(final TaskStatus status) .emit(); } } - } + }, + Runnable::run ); return statusFuture; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 5cb99ba569a1..43a96f2a7bb0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -623,7 +623,8 @@ public void onFailure(Throwable t) { removedWorkerCleanups.remove(workerHostAndPort, cleanupTask); } - } + }, + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 857264965f50..8c187d65f78e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -621,7 +621,8 @@ public void onFailure(Throwable t) log.error("Persist failed, dying"); backgroundThreadException = t; } - } + }, + Runnable::run ); } } @@ -876,7 +877,8 @@ private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) } else { return publishedSegmentsAndMetadata; } - } + }, + Runnable::run ); publishWaitList.add(publishFuture); @@ -888,6 +890,7 @@ private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) publishFuture, new FutureCallback() { + @SuppressWarnings("CheckReturnValue") @Override public void onSuccess(SegmentsAndMetadata publishedSegmentsAndMetadata) { @@ -925,7 +928,8 @@ public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata) handoffFuture.set(handoffSegmentsAndMetadata); return null; } - } + }, + Runnable::run ); } @@ -935,7 +939,8 @@ public void onFailure(Throwable t) log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata); handoffFuture.setException(t); } - } + }, + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 25250ac0487c..77530eae526f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -900,11 +900,13 @@ private Map> getCurrentTotalStats() futures.add( Futures.transform( taskClient.getMovingAveragesAsync(taskId), + // Ambiguous call if not explicitly declared (Function, StatsFromTaskResult>) (currentStats) -> new StatsFromTaskResult( groupId, taskId, currentStats - ) + ), + Runnable::run ) ); groupAndTaskIds.add(new Pair<>(groupId, taskId)); @@ -918,11 +920,13 @@ private Map> getCurrentTotalStats() futures.add( Futures.transform( taskClient.getMovingAveragesAsync(taskId), + // Ambiguous call if not explicitly declared (Function, StatsFromTaskResult>) (currentStats) -> new StatsFromTaskResult( groupId, taskId, currentStats - ) + ), + Runnable::run ) ); groupAndTaskIds.add(new Pair<>(groupId, taskId)); @@ -1625,7 +1629,8 @@ public Void apply(@Nullable Boolean result) } return null; } - } + }, + Runnable::run ); } @@ -1908,7 +1913,8 @@ public Map apply(@Nullable Object input) { return null; } - } + }, + Runnable::run ); } @@ -2536,7 +2542,8 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept } return null; - } + }, + Runnable::run ) ).collect(Collectors.toList()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 613c69b3a2e1..14fdca5f564a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -247,7 +247,8 @@ public void onFailure(Throwable t) { submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId()))); } - } + }, + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java index d3b531a3a464..441f6dc3b8c8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -201,7 +201,8 @@ public TaskStatus apply(TaskStatus taskStatus) throw Throwables.propagate(e); } } - } + }, + Runnable::run ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java index 85f661095989..62e1392e249d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java @@ -191,7 +191,8 @@ public void onFailure(Throwable th) log.debug(ex, "Request timed out or closed already."); } } - } + }, + Runnable::run ); asyncContext.setTimeout(timeout); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java index 41a4366feac9..4e3841399ee7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java @@ -228,7 +228,8 @@ public void onFailure(Throwable t) { runningItems.remove(taskRunnerWorkItem); } - } + }, + Runnable::run ); return statusFuture; diff --git a/pom.xml b/pom.xml index 6c6b1945f6f2..4d026ca0f30e 100644 --- a/pom.xml +++ b/pom.xml @@ -1398,6 +1398,19 @@ + + + recent-guava + + 27.0.1-jre + + + + hadoop-guava + + 11.0.2 + + diff --git a/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java b/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java index cb02683ed8c6..c9af7a7431d9 100644 --- a/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java +++ b/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Bytes; -import junit.framework.Assert; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.collections.bitmap.ConciseBitmapFactory; import org.apache.druid.collections.bitmap.ImmutableBitmap; @@ -35,6 +34,7 @@ import org.apache.druid.collections.spatial.split.LinearGutmanSplitStrategy; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.data.ImmutableRTreeObjectStrategy; +import org.junit.Assert; import org.junit.Test; import org.roaringbitmap.IntIterator; @@ -550,7 +550,7 @@ public void testSearchWithSplitLimitedBoundRoaring() } } - @SuppressWarnings("unused") // TODO rewrite to JMH and move to the benchmarks project + @SuppressWarnings({"unused", "CheckReturnValue"}) // TODO rewrite to JMH and move to the benchmarks project public void showBenchmarks() { final int start = 1; @@ -599,7 +599,7 @@ public void showBenchmarks() } } - @SuppressWarnings("unused") // TODO rewrite to JMH and move to the benchmarks project + @SuppressWarnings({"unused", "CheckReturnValue"}) // TODO rewrite to JMH and move to the benchmarks project public void showBenchmarksBoundWithLimits() { //final int start = 1; @@ -672,6 +672,6 @@ public void testToBytes() ImmutableRTree deserializedTree = genericIndexed.get(0); byte[] bytes2 = deserializedTree.toBytes(); - org.junit.Assert.assertEquals(Bytes.asList(bytes1), Bytes.asList(bytes2)); + Assert.assertEquals(Bytes.asList(bytes1), Bytes.asList(bytes2)); } } diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 7f73abf97c8f..1c57cf90f7f1 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -504,7 +504,8 @@ public void onFailure(Throwable t) } } } - } + }, + Runnable::run ); } catch (IOException e) { diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index be0811769f2c..d09e34f2a2b5 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -29,6 +29,7 @@ import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; import com.google.inject.Inject; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DiscoveryDruidNode; @@ -100,18 +101,14 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer * DataSources and not maintaining any segment information of other DataSources in memory. */ private final Predicate> defaultFilter; - private volatile Predicate> finalPredicate; - // For each queryable server, a name -> DruidServerHolder entry is kept private final ConcurrentHashMap servers = new ConcurrentHashMap<>(); - - private volatile ScheduledExecutorService executor; - private final HttpClient httpClient; private final ObjectMapper smileMapper; private final HttpServerInventoryViewConfig config; - private final CountDownLatch inventoryInitializationLatch = new CountDownLatch(1); + private volatile Predicate> finalPredicate; + private volatile ScheduledExecutorService executor; @Inject public HttpServerInventoryView( @@ -511,7 +508,12 @@ private class DruidServerHolder smileMapper, httpClient, executor, - new URL(druidServer.getScheme(), hostAndPort.getHostText(), hostAndPort.getPort(), "/"), + new URL( + druidServer.getScheme(), + GuavaUtils.getHostText(hostAndPort), + hostAndPort.getPort(), + "/" + ), "/druid-internal/v1/segments", SEGMENT_LIST_RESP_TYPE_REF, config.getServerTimeout(), diff --git a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java index 0e78e994fd04..b6888fe66f56 100644 --- a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java +++ b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; @@ -102,7 +101,7 @@ public void onFailure(Throwable t) exec ); }, - Execs.directExecutor() + Runnable::run ); } diff --git a/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java b/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java index b9348731bdae..25f6070720d1 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java +++ b/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; /** @@ -30,13 +29,6 @@ */ public class QueryStatus { - public enum Status - { - RUNNING, - SUCCESS, - FAILED - } - private final String id; private final Status status; private final long duration; @@ -53,6 +45,16 @@ public QueryStatus( this.duration = duration; } + @Override + public String toString() + { + return "QueryStatus{" + + "id='" + id + '\'' + + ", status=" + status + + ", duration=" + duration + + '}'; + } + @JsonProperty("id") public String getId() { @@ -77,13 +79,10 @@ public boolean isComplete() return status != Status.RUNNING; } - @Override - public String toString() + public enum Status { - return Objects.toStringHelper(this) - .add("id", id) - .add("status", status) - .add("duration", duration) - .toString(); + RUNNING, + SUCCESS, + FAILED } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java b/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java index cb506876cd36..074c04595072 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java +++ b/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java @@ -21,10 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import org.apache.druid.indexer.TaskState; +import java.util.Objects; + /** * Should be synced with org.apache.druid.indexing.common.TaskStatus */ @@ -68,6 +69,16 @@ public long getDuration() return duration; } + @Override + public String toString() + { + return "TaskStatus{" + + "id='" + id + '\'' + + ", status=" + status + + ", duration=" + duration + + '}'; + } + @Override public boolean equals(Object o) { @@ -79,23 +90,13 @@ public boolean equals(Object o) } TaskStatus that = (TaskStatus) o; return duration == that.duration && - java.util.Objects.equals(id, that.id) && + id.equals(that.id) && status == that.status; } @Override public int hashCode() { - return java.util.Objects.hash(id, status, duration); - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("id", id) - .add("status", status) - .add("duration", duration) - .toString(); + return Objects.hash(id, status, duration); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 351013474c1e..0ae36dd73750 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -322,7 +322,8 @@ public void onFailure(Throwable t) { persistError = t; } - } + }, + Runnable::run ); } else { isPersistRequired = true; @@ -647,6 +648,7 @@ public ListenableFuture push( * This is useful if we're going to do something that would otherwise potentially break currently in-progress * pushes. */ + @SuppressWarnings("CheckReturnValue") private ListenableFuture pushBarrier() { return intermediateTempExecutor.submit( diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java index 8eb27db0dd38..afea2c6fadcb 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -330,7 +330,8 @@ public void onFailure(Throwable e) // TODO: Retry? log.warn(e, "Failed to drop segment: %s", identifier); } - } + }, + Runnable::run ); } @@ -483,7 +484,8 @@ public void onFailure(Throwable e) log.warn(e, "Failed to push [%,d] segments.", segmentsToPush.size()); errorHandler.apply(e); } - } + }, + Runnable::run ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index cb6ba9085a7f..6266d7298fc9 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -522,7 +522,8 @@ ListenableFuture dropInBackground(SegmentsAndMetadata segme segmentsAndMetadata.getSegments(), metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() ); - } + }, + Runnable::run ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 2599387dd158..8a575adfd9d3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -284,7 +284,8 @@ public ListenableFuture publish( sequenceNames.forEach(segments::remove); } return sam; - } + }, + Runnable::run ); } @@ -363,7 +364,8 @@ public void onFailure(Throwable e) numRemainingHandoffSegments.decrementAndGet(); resultFuture.setException(e); } - } + }, + Runnable::run ); } ); diff --git a/server/src/main/java/org/apache/druid/server/DruidNode.java b/server/src/main/java/org/apache/druid/server/DruidNode.java index 8454dc673948..827026a6e8e1 100644 --- a/server/src/main/java/org/apache/druid/server/DruidNode.java +++ b/server/src/main/java/org/apache/druid/server/DruidNode.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; import com.google.inject.name.Named; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.common.utils.SocketUtil; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -146,7 +147,7 @@ private void init(String serviceName, String host, boolean bindOnHost, Integer p Integer portFromHostConfig; if (host != null) { hostAndPort = HostAndPort.fromString(host); - host = hostAndPort.getHostText(); + host = GuavaUtils.getHostText(hostAndPort); portFromHostConfig = hostAndPort.hasPort() ? hostAndPort.getPort() : null; if (plainTextPort != null && portFromHostConfig != null && !plainTextPort.equals(portFromHostConfig)) { throw new IAE("Conflicting host:port [%s] and port [%d] settings", host, plainTextPort); diff --git a/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java b/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java index 428a51d14018..35fbe8cd2d87 100644 --- a/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java +++ b/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.StringUtils; public class HostAndPortWithScheme @@ -69,7 +70,7 @@ public String getScheme() public String getHostText() { - return hostAndPort.getHostText(); + return GuavaUtils.getHostText(hostAndPort); } public int getPort() diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java index 3a531b7d384d..619d7703a844 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java @@ -204,7 +204,8 @@ public void onFailure(Throwable th) log.debug(ex, "Request timed out or closed already."); } } - } + }, + Runnable::run ); asyncContext.setTimeout(timeout); @@ -315,7 +316,8 @@ public void onFailure(Throwable th) log.debug(ex, "Request timed out or closed already."); } } - } + }, + Runnable::run ); asyncContext.setTimeout(timeout); diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java index 0f90c83935c8..da04e6efac48 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java @@ -406,7 +406,8 @@ public void onFailure(Throwable t) LOG.makeAlert(t, "Background lookup manager exited with error!").emit(); } } - } + }, + Runnable::run ); LOG.debug("Started"); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index e1b7a886d645..b141317bd4d1 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -372,7 +372,8 @@ public void onFailure(Throwable t) { pair.lhs.setException(t); } - } + }, + Runnable::run ); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 79b575434bc9..dde7af8d34c1 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -20,7 +20,6 @@ package org.apache.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -28,6 +27,7 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; @@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.ListenableFutures; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -489,9 +490,13 @@ public ListenableFuture push( ) ) .collect(Collectors.toList()); - return Futures.transform( + return ListenableFutures.transformAsync( persistAll(committer), - (Function) commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata) + input -> { + final SettableFuture future = SettableFuture.create(); + future.set(new SegmentsAndMetadata(segments, input)); + return future; + } ); } else { if (interruptPush) { diff --git a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java index d513aebb92d3..082007591b59 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java @@ -145,7 +145,8 @@ public void onFailure(Throwable t) { callbackExcecuted.set(true); } - } + }, + Runnable::run ); future.cancel(true); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index e895113f8505..e021295b4033 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -48,6 +48,7 @@ import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.RE; @@ -709,7 +710,7 @@ private static String extractHost(@Nullable final String hostAndPort) return null; } - return HostAndPort.fromString(hostAndPort).getHostText(); + return GuavaUtils.getHostText(HostAndPort.fromString(hostAndPort)); } private static int extractPort(@Nullable final String hostAndPort)