From f4deddf4c45f1a543bab741e65c0fde623249022 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Wed, 21 Feb 2018 22:46:26 -0800 Subject: [PATCH 01/15] Future-proof some Guava usage * Use a java-util EmptyIterator instead of Guava's * Change some of the guava future handling to do manual async transforms. Guava changes transform into transformAsync by deprecating transform in ONLY Guava 19. Then its gone in 20 --- .../druid/common/utils/SerializerUtils.java | 10 +-- .../rocketmq/RocketMQFirehoseFactory.java | 4 +- ...fkaEightSimpleConsumerFirehoseFactory.java | 10 +-- .../rabbitmq/RabbitMQFirehoseFactory.java | 4 +- .../kafka/KafkaEightFirehoseFactory.java | 4 +- .../main/java/io/druid/indexer/JobHelper.java | 30 +++----- .../AppenderatorDriverRealtimeIndexTask.java | 41 ++++++++-- .../common/tasklogs/FileTaskLogsTest.java | 2 +- .../util/common/collect/EmptyIterator.java | 59 +++++++++++++++ pom.xml | 5 ++ processing/pom.xml | 4 + .../query/groupby/GroupByQueryEngine.java | 4 +- .../epinephelinae/BufferHashGrouper.java | 4 +- .../LimitedBufferHashGrouper.java | 7 +- .../query/groupby/orderby/TopNSequence.java | 4 +- .../druid/client/CachingClusteredClient.java | 20 +++-- .../io/druid/client/CachingQueryRunner.java | 4 +- .../appenderator/BatchAppenderatorDriver.java | 55 ++++++++++---- .../StreamAppenderatorDriver.java | 75 +++++++++++++------ .../realtime/firehose/IrcFirehoseFactory.java | 4 +- 20 files changed, 249 insertions(+), 101 deletions(-) create mode 100644 java-util/src/main/java/io/druid/java/util/common/collect/EmptyIterator.java diff --git a/common/src/main/java/io/druid/common/utils/SerializerUtils.java b/common/src/main/java/io/druid/common/utils/SerializerUtils.java index 45abf9814f69..78d1edd8e77a 100644 --- a/common/src/main/java/io/druid/common/utils/SerializerUtils.java +++ b/common/src/main/java/io/druid/common/utils/SerializerUtils.java @@ -20,7 +20,6 @@ package io.druid.common.utils; import com.google.common.io.ByteStreams; -import com.google.common.io.OutputSupplier; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import io.druid.io.Channels; @@ -62,13 +61,6 @@ public void writeString(T out, String name) throws IOEx out.write(nameBytes); } - public void writeString(OutputSupplier supplier, String name) throws IOException - { - try (OutputStream out = supplier.getOutput()) { - writeString(out, name); - } - } - public void writeString(WritableByteChannel out, String name) throws IOException { byte[] nameBytes = StringUtils.toUtf8(name); @@ -89,7 +81,7 @@ public String readString(ByteBuffer in) throws IOException final int length = in.getInt(); return StringUtils.fromUtf8(readBytes(in, length)); } - + public byte[] readBytes(ByteBuffer in, int length) throws IOException { byte[] bytes = new byte[length]; diff --git a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index 1f4f03cb9594..f72986868619 100644 --- a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -33,13 +33,13 @@ import com.alibaba.rocketmq.remoting.exception.RemotingException; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; @@ -199,7 +199,7 @@ public Firehose connect( return new Firehose() { - private Iterator nextIterator = Iterators.emptyIterator(); + private Iterator nextIterator = EmptyIterator.instance(); @Override public boolean hasMore() diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java index c65fa808d70c..81613c349b0e 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java @@ -22,11 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.io.Closeables; -import io.druid.java.util.common.parsers.ParseException; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.Committer; import io.druid.data.input.FirehoseFactoryV2; @@ -34,6 +31,9 @@ import io.druid.data.input.InputRow; import io.druid.firehose.kafka.KafkaSimpleConsumer.BytesMessageWithOffset; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.collect.EmptyIterator; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.java.util.emitter.EmittingLogger; import java.io.Closeable; import java.io.IOException; @@ -177,7 +177,7 @@ public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object private volatile boolean stopped; private volatile BytesMessageWithOffset msg = null; private volatile InputRow row = null; - private volatile Iterator nextIterator = Iterators.emptyIterator(); + private volatile Iterator nextIterator = EmptyIterator.instance(); { lastOffsetPartitions = Maps.newHashMap(); @@ -212,7 +212,7 @@ private void nextMessage() msg = messageQueue.take(); final byte[] message = msg.message(); nextIterator = message == null - ? Iterators.emptyIterator() + ? EmptyIterator.instance() : firehoseParser.parseBatch(ByteBuffer.wrap(message)).iterator(); continue; } diff --git a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index f6759da27e90..0c23fb9fcef3 100644 --- a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Iterators; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -35,6 +34,7 @@ import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; +import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.logger.Logger; import net.jodah.lyra.ConnectionOptions; import net.jodah.lyra.Connections; @@ -205,7 +205,7 @@ public void shutdownCompleted(ShutdownSignalException cause) */ private long lastDeliveryTag; - private Iterator nextIterator = Iterators.emptyIterator(); + private Iterator nextIterator = EmptyIterator.instance(); @Override public boolean hasMore() diff --git a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java index 9d7c6c5ffbe7..033bb383324c 100644 --- a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -23,12 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; +import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.logger.Logger; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; @@ -108,7 +108,7 @@ public Firehose connect(final InputRowParser firehoseParser, File te return new Firehose() { - Iterator nextIterator = Iterators.emptyIterator(); + Iterator nextIterator = EmptyIterator.instance(); @Override public boolean hasMore() diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 16809561cbbc..db44e01e422a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -23,9 +23,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; -import com.google.common.io.ByteStreams; import com.google.common.io.Files; -import com.google.common.io.OutputSupplier; import io.druid.indexer.updater.HadoopDruidConverterConfig; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.FileUtils; @@ -90,6 +88,7 @@ public static Path distributedClassPath(Path base) { return new Path(base, "classpath"); } + public static final String INDEX_ZIP = "index.zip"; public static final String DESCRIPTOR_JSON = "descriptor.json"; @@ -277,17 +276,9 @@ static void addSnapshotJarToClassPath( static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException { log.info("Uploading jar to path[%s]", path); - ByteStreams.copy( - Files.newInputStreamSupplier(jarFile), - new OutputSupplier() - { - @Override - public OutputStream getOutput() throws IOException - { - return fs.create(path); - } - } - ); + try (OutputStream os = fs.create(path)) { + Files.asByteSource(jarFile).copyTo(os); + } } static boolean isSnapshot(File jarFile) @@ -562,8 +553,10 @@ public static Path makeFileNamePath( DataSegmentPusher dataSegmentPusher ) { - return new Path(prependFSIfNullScheme(fs, basePath), - dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName)); + return new Path( + prependFSIfNullScheme(fs, basePath), + dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName) + ); } public static Path makeTmpPath( @@ -576,9 +569,10 @@ public static Path makeTmpPath( { return new Path( prependFSIfNullScheme(fs, basePath), - StringUtils.format("./%s.%d", - dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP), - taskAttemptID.getId() + StringUtils.format( + "./%s.%d", + dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP), + taskAttemptID.getId() ) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 4af077fe501a..be122a1e9702 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -26,8 +26,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -70,6 +72,7 @@ import io.druid.segment.realtime.plumber.Committers; import org.apache.commons.io.FileUtils; +import javax.annotation.Nullable; import java.io.File; import java.util.Collections; import java.util.Map; @@ -388,9 +391,9 @@ public RealtimeAppenderatorIngestionSpec getSpec() /** * Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than * abruptly stopping. - * + *

* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this. - * + *

* Protected for tests. */ protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) @@ -431,19 +434,45 @@ private void publishSegments( String sequenceName ) { - ListenableFuture publishFuture = driver.publish( + final ListenableFuture publishFuture = driver.publish( publisher, committerSupplier.get(), Collections.singletonList(sequenceName) ); + final SettableFuture handoffDoneFuture = SettableFuture.create(); + Futures.addCallback(publishFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata result) + { + final ListenableFuture handoffFuture = driver.registerHandoff(result); + Futures.addCallback(handoffFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata result) + { + handoffDoneFuture.set(result); + } - ListenableFuture handoffFuture = Futures.transform(publishFuture, driver::registerHandoff); + @Override + public void onFailure(Throwable t) + { + handoffDoneFuture.setException(t); + } + }); + } - pendingHandoffs.add(handoffFuture); + @Override + public void onFailure(Throwable t) + { + handoffDoneFuture.setException(t); + } + }); + pendingHandoffs.add(handoffDoneFuture); } private void waitForSegmentPublishAndHandoff(long timeout) throws InterruptedException, ExecutionException, - TimeoutException + TimeoutException { if (!pendingHandoffs.isEmpty()) { ListenableFuture allHandoffs = Futures.allAsList(pendingHandoffs); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java index 066f01d05ae4..cb7b2282a68b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java @@ -59,7 +59,7 @@ public void testSimple() throws Exception final Map expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); for (Map.Entry entry : expected.entrySet()) { - final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput()); + final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().openStream()); final String string = StringUtils.fromUtf8(bytes); Assert.assertEquals(StringUtils.format("Read with offset %,d", entry.getKey()), string, entry.getValue()); } diff --git a/java-util/src/main/java/io/druid/java/util/common/collect/EmptyIterator.java b/java-util/src/main/java/io/druid/java/util/common/collect/EmptyIterator.java new file mode 100644 index 000000000000..109175d0a9b6 --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/common/collect/EmptyIterator.java @@ -0,0 +1,59 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.collect; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.function.Consumer; + +public class EmptyIterator implements Iterator +{ + public static final EmptyIterator INSTANCE = new EmptyIterator(); + + @SuppressWarnings("unchecked") + public static EmptyIterator instance() + { + return INSTANCE; + } + + @Override + public final boolean hasNext() + { + return false; + } + + @Override + public final T next() + { + throw new NoSuchElementException(); + } + + @Override + public final void remove() + { + throw new UnsupportedOperationException("remove"); + } + + @Override + public final void forEachRemaining(Consumer action) + { + + } +} diff --git a/pom.xml b/pom.xml index 9220865ca476..2978d922fc44 100644 --- a/pom.xml +++ b/pom.xml @@ -341,6 +341,11 @@ guice-multibindings ${guice.version} + + com.google.errorprone + error_prone_annotations + 2.2.0 + com.ibm.icu icu4j diff --git a/processing/pom.xml b/processing/pom.xml index 220abc4192f3..accd83ebe648 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -82,6 +82,10 @@ commons-io commons-io + + com.google.errorprone + error_prone_annotations + com.ibm.icu icu4j diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index eb3a6cc6268f..e4c38665d6ef 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -22,7 +22,6 @@ import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; @@ -33,6 +32,7 @@ import io.druid.guice.annotations.Global; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.FunctionalIterator; @@ -320,7 +320,7 @@ public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBu this.maxIntermediateRows = querySpecificConfig.getMaxIntermediateRows(); unprocessedKeys = null; - delegate = Iterators.emptyIterator(); + delegate = EmptyIterator.instance(); dimensionSpecs = query.getDimensions(); dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java index 86a6229b9d97..a56ed696cb7e 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java @@ -20,9 +20,9 @@ package io.druid.query.groupby.epinephelinae; import com.google.common.base.Supplier; -import com.google.common.collect.Iterators; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.parsers.CloseableIterator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.ColumnSelectorFactory; @@ -171,7 +171,7 @@ public CloseableIterator> iterator(boolean sorted) if (!initialized) { // it's possible for iterator() to be called before initialization when // a nested groupBy's subquery has an empty result set (see testEmptySubquery() in GroupByQueryRunnerTest) - return CloseableIterators.withEmptyBaggage(Iterators.>emptyIterator()); + return CloseableIterators.withEmptyBaggage(EmptyIterator.>instance()); } if (sorted) { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java index b6c9834d2b2f..0ada53fb8d5e 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java @@ -20,10 +20,10 @@ package io.druid.query.groupby.epinephelinae; import com.google.common.base.Supplier; -import com.google.common.collect.Iterators; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.parsers.CloseableIterator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.ColumnSelectorFactory; @@ -206,7 +206,7 @@ public CloseableIterator> iterator(boolean sorted) // it's possible for iterator() to be called before initialization when // a nested groupBy's subquery has an empty result set (see testEmptySubqueryWithLimitPushDown() // in GroupByQueryRunnerTest) - return CloseableIterators.withEmptyBaggage(Iterators.>emptyIterator()); + return CloseableIterators.withEmptyBaggage(EmptyIterator.>instance()); } if (sortHasNonGroupingFields) { @@ -378,6 +378,7 @@ private Comparator makeHeapComparator() aggregatorFactories, aggregatorOffsets ); + @Override public int compare(Integer o1, Integer o2) { @@ -453,7 +454,7 @@ public AlternatingByteBufferHashTable( subHashTable2Buffer.limit(tableArenaSize); subHashTable2Buffer = subHashTable2Buffer.slice(); - subHashTableBuffers = new ByteBuffer[] {subHashTable1Buffer, subHashTable2Buffer}; + subHashTableBuffers = new ByteBuffer[]{subHashTable1Buffer, subHashTable2Buffer}; } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java b/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java index 574204d7a514..22437a284fbc 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java @@ -19,9 +19,9 @@ package io.druid.query.groupby.orderby; -import com.google.common.collect.Iterators; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; +import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; @@ -43,7 +43,7 @@ public TopNSequence( public Iterator make() { if (limit <= 0) { - return Iterators.emptyIterator(); + return EmptyIterator.instance(); } // Materialize the topN values diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 0fd9762dc7e9..1b1f51043bed 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -26,7 +26,6 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -40,21 +39,22 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; -import io.druid.java.util.common.concurrent.Execs; import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.collect.EmptyIterator; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.LazySequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; import io.druid.query.Query; @@ -255,7 +255,8 @@ private ImmutableMap makeDownstreamQueryContext() Sequence run(final UnaryOperator> timelineConverter) { - @Nullable TimelineLookup timeline = serverView.getTimeline(query.getDataSource()); + @Nullable + TimelineLookup timeline = serverView.getTimeline(query.getDataSource()); if (timeline == null) { return Sequences.empty(); } @@ -265,10 +266,13 @@ Sequence run(final UnaryOperator> time } final Set segments = computeSegmentsToQuery(timeline); - @Nullable final byte[] queryCacheKey = computeQueryCacheKey(); + @Nullable + final byte[] queryCacheKey = computeQueryCacheKey(); if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) { - @Nullable final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH); - @Nullable final String currentEtag = computeCurrentEtag(segments, queryCacheKey); + @Nullable + final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH); + @Nullable + final String currentEtag = computeCurrentEtag(segments, queryCacheKey); if (currentEtag != null && currentEtag.equals(prevEtag)) { return Sequences.empty(); } @@ -509,7 +513,7 @@ public Iterator make() { try { if (cachedResult.length == 0) { - return Iterators.emptyIterator(); + return EmptyIterator.instance(); } return objectMapper.readValues( diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 17df72a59967..5b6e04a6e522 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -32,6 +31,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -117,7 +117,7 @@ public Iterator make() { try { if (cachedResult.length == 0) { - return Iterators.emptyIterator(); + return EmptyIterator.instance(); } return mapper.readValues( diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index c67b31054bf1..9eef59861d63 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.druid.data.input.InputRow; @@ -34,6 +35,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -41,11 +43,11 @@ /** * This class is specifialized for batch ingestion. In batch ingestion, the segment lifecycle is like: - * + *

*

  * APPENDING -> PUSHED_AND_DROPPED -> PUBLISHED
  * 
- * + *

*

    *
  • APPENDING: Segment is available for appending.
  • *
  • PUSHED_AND_DROPPED: Segment is pushed to deep storage and dropped from the local storage.
  • @@ -57,9 +59,9 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver /** * Create a driver. * - * @param appenderator appenderator - * @param segmentAllocator segment allocator - * @param usedSegmentChecker used segment checker + * @param appenderator appenderator + * @param segmentAllocator segment allocator + * @param usedSegmentChecker used segment checker */ public BatchAppenderatorDriver( Appenderator appenderator, @@ -72,7 +74,7 @@ public BatchAppenderatorDriver( /** * This method always returns null because batch ingestion doesn't support restoring tasks on failures. - + * * @return always null */ @Override @@ -132,14 +134,40 @@ private SegmentsAndMetadata pushAndClear( .map(SegmentWithState::getSegmentIdentifier) .collect(Collectors.toList()); - final ListenableFuture future = Futures.transform( - pushInBackground(null, segmentIdentifierList), - this::dropInBackground - ); + final CompletableFuture chainEndFuture = new CompletableFuture<>(); + final ListenableFuture pushFuture = pushInBackground(null, segmentIdentifierList); + Futures.addCallback(pushFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata result) + { + final ListenableFuture droFuture = dropInBackground(result); + Futures.addCallback(droFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata result) + { + chainEndFuture.complete(result); + } + + @Override + public void onFailure(Throwable t) + { + chainEndFuture.completeExceptionally(t); + } + }); + } + + @Override + public void onFailure(Throwable t) + { + chainEndFuture.completeExceptionally(t); + } + }); final SegmentsAndMetadata segmentsAndMetadata = pushAndClearTimeoutMs == 0L ? - future.get() : - future.get(pushAndClearTimeoutMs, TimeUnit.MILLISECONDS); + chainEndFuture.get() : + chainEndFuture.get(pushAndClearTimeoutMs, TimeUnit.MILLISECONDS); // Sanity check final Map pushedSegmentIdToSegmentMap = segmentsAndMetadata @@ -198,7 +226,8 @@ public ListenableFuture publishAll(final TransactionalSegme .map(segmentWithState -> Preconditions.checkNotNull( segmentWithState.getDataSegment(), "dataSegment for segmentId[%s]", - segmentWithState.getSegmentIdentifier()) + segmentWithState.getSegmentIdentifier() + ) ) .collect(Collectors.toList()), null diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index ab2c0262e9f0..bd43375e58c8 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -20,11 +20,9 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -54,11 +52,11 @@ /** * This class is specialized for streaming ingestion. In streaming ingestion, the segment lifecycle is like: - * + *

    *

      * APPENDING -> APPEND_FINISHED -> PUBLISHED
      * 
    - * + *

    *

      *
    • APPENDING: Segment is available for appending.
    • *
    • APPEND_FINISHED: Segment cannot be updated (data cannot be added anymore) and is waiting for being published.
    • @@ -269,23 +267,43 @@ public ListenableFuture publish( .map(SegmentWithState::getSegmentIdentifier) .collect(Collectors.toList()); - final ListenableFuture publishFuture = Futures.transform( - pushInBackground(wrapCommitter(committer), theSegments), - (AsyncFunction) segmentsAndMetadata -> publishInBackground( - segmentsAndMetadata, + final SettableFuture publishCompleted = SettableFuture.create(); + final ListenableFuture publishFuture = pushInBackground(wrapCommitter(committer), theSegments); + Futures.addCallback(publishFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata result) + { + final ListenableFuture publishMoreFuture = publishInBackground( + result, publisher - ) - ); + ); + Futures.addCallback(publishFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata result) + { + synchronized (segments) { + sequenceNames.forEach(segments::remove); + } + publishCompleted.set(result); + } - return Futures.transform( - publishFuture, - (Function) segmentsAndMetadata -> { - synchronized (segments) { - sequenceNames.forEach(segments::remove); + @Override + public void onFailure(Throwable t) + { + publishCompleted.setException(t); } - return segmentsAndMetadata; - } - ); + }); + } + + @Override + public void onFailure(Throwable t) + { + publishCompleted.setException(t); + } + }); + return publishCompleted; } /** @@ -378,10 +396,23 @@ public ListenableFuture publishAndRegisterHandoff( final Collection sequenceNames ) { - return Futures.transform( - publish(publisher, committer, sequenceNames), - this::registerHandoff - ); + final SettableFuture publishDoneFuture = SettableFuture.create(); + final ListenableFuture publishFuture = publish(publisher, committer, sequenceNames); + Futures.addCallback(publishFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata result) + { + publishDoneFuture.set(result); + } + + @Override + public void onFailure(Throwable t) + { + publishDoneFuture.setException(t); + } + }); + return publishDoneFuture; } @Override diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java index 6927e46d91fb..828c58933583 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.ircclouds.irc.api.Callback; import com.ircclouds.irc.api.IRCApi; @@ -37,6 +36,7 @@ import io.druid.data.input.impl.InputRowParser; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.logger.Logger; import org.joda.time.DateTime; @@ -189,7 +189,7 @@ public void onFailure(Exception e) return new Firehose() { InputRow nextRow = null; - Iterator nextIterator = Iterators.emptyIterator(); + Iterator nextIterator = EmptyIterator.instance(); @Override public boolean hasMore() From a54c7e5bff6d8c8e8effdfeac9251ab9fe4b8841 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 22 Feb 2018 07:58:24 -0800 Subject: [PATCH 02/15] Use `Collections.emptyIterator()` --- .../rocketmq/RocketMQFirehoseFactory.java | 4 +- ...fkaEightSimpleConsumerFirehoseFactory.java | 6 +- .../rabbitmq/RabbitMQFirehoseFactory.java | 4 +- .../kafka/KafkaEightFirehoseFactory.java | 4 +- .../util/common/collect/EmptyIterator.java | 59 ------------------- .../query/groupby/GroupByQueryEngine.java | 4 +- .../epinephelinae/BufferHashGrouper.java | 3 +- .../LimitedBufferHashGrouper.java | 3 +- .../query/groupby/orderby/TopNSequence.java | 4 +- .../druid/client/CachingClusteredClient.java | 3 +- .../io/druid/client/CachingQueryRunner.java | 3 +- .../realtime/firehose/IrcFirehoseFactory.java | 4 +- 12 files changed, 19 insertions(+), 82 deletions(-) delete mode 100644 java-util/src/main/java/io/druid/java/util/common/collect/EmptyIterator.java diff --git a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index f72986868619..c044de77105a 100644 --- a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -39,7 +39,6 @@ import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; @@ -48,6 +47,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -199,7 +199,7 @@ public Firehose connect( return new Firehose() { - private Iterator nextIterator = EmptyIterator.instance(); + private Iterator nextIterator = Collections.emptyIterator(); @Override public boolean hasMore() diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java index 81613c349b0e..982c7dc12920 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java @@ -31,13 +31,13 @@ import io.druid.data.input.InputRow; import io.druid.firehose.kafka.KafkaSimpleConsumer.BytesMessageWithOffset; import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.emitter.EmittingLogger; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -177,7 +177,7 @@ public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object private volatile boolean stopped; private volatile BytesMessageWithOffset msg = null; private volatile InputRow row = null; - private volatile Iterator nextIterator = EmptyIterator.instance(); + private volatile Iterator nextIterator = Collections.emptyIterator(); { lastOffsetPartitions = Maps.newHashMap(); @@ -212,7 +212,7 @@ private void nextMessage() msg = messageQueue.take(); final byte[] message = msg.message(); nextIterator = message == null - ? EmptyIterator.instance() + ? Collections.emptyIterator() : firehoseParser.parseBatch(ByteBuffer.wrap(message)).iterator(); continue; } diff --git a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index 0c23fb9fcef3..134c3eaf90e7 100644 --- a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -34,7 +34,6 @@ import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.logger.Logger; import net.jodah.lyra.ConnectionOptions; import net.jodah.lyra.Connections; @@ -46,6 +45,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -205,7 +205,7 @@ public void shutdownCompleted(ShutdownSignalException cause) */ private long lastDeliveryTag; - private Iterator nextIterator = EmptyIterator.instance(); + private Iterator nextIterator = Collections.emptyIterator(); @Override public boolean hasMore() diff --git a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java index 033bb383324c..5f160d3f12ec 100644 --- a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -28,7 +28,6 @@ import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.logger.Logger; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; @@ -41,6 +40,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -108,7 +108,7 @@ public Firehose connect(final InputRowParser firehoseParser, File te return new Firehose() { - Iterator nextIterator = EmptyIterator.instance(); + Iterator nextIterator = Collections.emptyIterator(); @Override public boolean hasMore() diff --git a/java-util/src/main/java/io/druid/java/util/common/collect/EmptyIterator.java b/java-util/src/main/java/io/druid/java/util/common/collect/EmptyIterator.java deleted file mode 100644 index 109175d0a9b6..000000000000 --- a/java-util/src/main/java/io/druid/java/util/common/collect/EmptyIterator.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.collect; - -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.function.Consumer; - -public class EmptyIterator implements Iterator -{ - public static final EmptyIterator INSTANCE = new EmptyIterator(); - - @SuppressWarnings("unchecked") - public static EmptyIterator instance() - { - return INSTANCE; - } - - @Override - public final boolean hasNext() - { - return false; - } - - @Override - public final T next() - { - throw new NoSuchElementException(); - } - - @Override - public final void remove() - { - throw new UnsupportedOperationException("remove"); - } - - @Override - public final void forEachRemaining(Consumer action) - { - - } -} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index e4c38665d6ef..02fdbafcc40e 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -32,7 +32,6 @@ import io.druid.guice.annotations.Global; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.FunctionalIterator; @@ -58,6 +57,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -320,7 +320,7 @@ public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBu this.maxIntermediateRows = querySpecificConfig.getMaxIntermediateRows(); unprocessedKeys = null; - delegate = EmptyIterator.instance(); + delegate = Collections.emptyIterator(); dimensionSpecs = query.getDimensions(); dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java index a56ed696cb7e..21f7abb0f26c 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java @@ -22,7 +22,6 @@ import com.google.common.base.Supplier; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.IAE; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.parsers.CloseableIterator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.ColumnSelectorFactory; @@ -171,7 +170,7 @@ public CloseableIterator> iterator(boolean sorted) if (!initialized) { // it's possible for iterator() to be called before initialization when // a nested groupBy's subquery has an empty result set (see testEmptySubquery() in GroupByQueryRunnerTest) - return CloseableIterators.withEmptyBaggage(EmptyIterator.>instance()); + return CloseableIterators.withEmptyBaggage(Collections.>emptyIterator()); } if (sorted) { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java index 0ada53fb8d5e..694293e26781 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java @@ -23,7 +23,6 @@ import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.parsers.CloseableIterator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.ColumnSelectorFactory; @@ -206,7 +205,7 @@ public CloseableIterator> iterator(boolean sorted) // it's possible for iterator() to be called before initialization when // a nested groupBy's subquery has an empty result set (see testEmptySubqueryWithLimitPushDown() // in GroupByQueryRunnerTest) - return CloseableIterators.withEmptyBaggage(EmptyIterator.>instance()); + return CloseableIterators.withEmptyBaggage(Collections.>emptyIterator()); } if (sortHasNonGroupingFields) { diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java b/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java index 22437a284fbc..0bb5f6aab25d 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java @@ -21,11 +21,11 @@ import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; +import java.util.Collections; import java.util.Iterator; public class TopNSequence extends BaseSequence> @@ -43,7 +43,7 @@ public TopNSequence( public Iterator make() { if (limit <= 0) { - return EmptyIterator.instance(); + return Collections.emptyIterator(); } // Materialize the topN values diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 1b1f51043bed..1e2e441949d8 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -48,7 +48,6 @@ import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.LazySequence; @@ -513,7 +512,7 @@ public Iterator make() { try { if (cachedResult.length == 0) { - return EmptyIterator.instance(); + return Collections.emptyIterator(); } return objectMapper.readValues( diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 5b6e04a6e522..54ecc5c35a90 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -31,7 +31,6 @@ import com.google.common.util.concurrent.SettableFuture; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -117,7 +116,7 @@ public Iterator make() { try { if (cachedResult.length == 0) { - return EmptyIterator.instance(); + return Collections.emptyIterator(); } return mapper.readValues( diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java index 828c58933583..b1b0749d23ff 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java @@ -36,13 +36,13 @@ import io.druid.data.input.impl.InputRowParser; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Pair; -import io.druid.java.util.common.collect.EmptyIterator; import io.druid.java.util.common.logger.Logger; import org.joda.time.DateTime; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -189,7 +189,7 @@ public void onFailure(Exception e) return new Firehose() { InputRow nextRow = null; - Iterator nextIterator = EmptyIterator.instance(); + Iterator nextIterator = Collections.emptyIterator(); @Override public boolean hasMore() From 6f8cd33e3299fdd2be9694d679dae98a7b434f14 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 22 Feb 2018 08:26:16 -0800 Subject: [PATCH 03/15] Pretty formatting --- .../appenderator/BatchAppenderatorDriver.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index 9eef59861d63..d441f60c1667 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -223,11 +223,12 @@ public ListenableFuture publishAll(final TransactionalSegme .values() .stream() .flatMap(SegmentsForSequence::segmentStateStream) - .map(segmentWithState -> Preconditions.checkNotNull( - segmentWithState.getDataSegment(), - "dataSegment for segmentId[%s]", - segmentWithState.getSegmentIdentifier() - ) + .map(segmentWithState -> Preconditions + .checkNotNull( + segmentWithState.getDataSegment(), + "dataSegment for segmentId[%s]", + segmentWithState.getSegmentIdentifier() + ) ) .collect(Collectors.toList()), null From 6491e478fa6b8b625b1d14a0d8b9b98312b53358 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 22 Feb 2018 17:00:48 -0800 Subject: [PATCH 04/15] Make listenable future transforms a thing in default druid --- .../AppenderatorDriverRealtimeIndexTask.java | 35 +--------- .../common/concurrent/ListenableFutures.java | 64 +++++++++++++++++ .../appenderator/BatchAppenderatorDriver.java | 43 +++--------- .../StreamAppenderatorDriver.java | 69 +++++-------------- 4 files changed, 92 insertions(+), 119 deletions(-) create mode 100644 java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index be122a1e9702..bddfdc2d56a8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -26,10 +26,8 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -48,6 +46,7 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.ListenableFutures; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.emitter.EmittingLogger; @@ -72,7 +71,6 @@ import io.druid.segment.realtime.plumber.Committers; import org.apache.commons.io.FileUtils; -import javax.annotation.Nullable; import java.io.File; import java.util.Collections; import java.util.Map; @@ -439,36 +437,7 @@ private void publishSegments( committerSupplier.get(), Collections.singletonList(sequenceName) ); - final SettableFuture handoffDoneFuture = SettableFuture.create(); - Futures.addCallback(publishFuture, new FutureCallback() - { - @Override - public void onSuccess(@Nullable SegmentsAndMetadata result) - { - final ListenableFuture handoffFuture = driver.registerHandoff(result); - Futures.addCallback(handoffFuture, new FutureCallback() - { - @Override - public void onSuccess(@Nullable SegmentsAndMetadata result) - { - handoffDoneFuture.set(result); - } - - @Override - public void onFailure(Throwable t) - { - handoffDoneFuture.setException(t); - } - }); - } - - @Override - public void onFailure(Throwable t) - { - handoffDoneFuture.setException(t); - } - }); - pendingHandoffs.add(handoffDoneFuture); + pendingHandoffs.add(ListenableFutures.transform(publishFuture, driver::registerHandoff)); } private void waitForSegmentPublishAndHandoff(long timeout) throws InterruptedException, ExecutionException, diff --git a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java new file mode 100644 index 000000000000..14b301104226 --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java @@ -0,0 +1,64 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.concurrent; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import javax.annotation.Nullable; +import java.util.function.Function; + +public class ListenableFutures +{ + public static ListenableFuture transform(final ListenableFuture inFuture, final Function> transform) { + final SettableFuture finalFuture = SettableFuture.create(); + Futures.addCallback(inFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable I result) + { + final ListenableFuture transformFuture = transform.apply(result); + Futures.addCallback(transformFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable O result) + { + finalFuture.set(result); + } + + @Override + public void onFailure(Throwable t) + { + finalFuture.setException(t); + } + }); + } + + @Override + public void onFailure(Throwable t) + { + finalFuture.setException(t); + } + }); + return finalFuture; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index d441f60c1667..998dbb06f5aa 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -22,11 +22,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.ListenableFutures; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import io.druid.timeline.DataSegment; @@ -35,7 +34,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -134,40 +132,15 @@ private SegmentsAndMetadata pushAndClear( .map(SegmentWithState::getSegmentIdentifier) .collect(Collectors.toList()); - final CompletableFuture chainEndFuture = new CompletableFuture<>(); - final ListenableFuture pushFuture = pushInBackground(null, segmentIdentifierList); - Futures.addCallback(pushFuture, new FutureCallback() - { - @Override - public void onSuccess(@Nullable SegmentsAndMetadata result) - { - final ListenableFuture droFuture = dropInBackground(result); - Futures.addCallback(droFuture, new FutureCallback() - { - @Override - public void onSuccess(@Nullable SegmentsAndMetadata result) - { - chainEndFuture.complete(result); - } - - @Override - public void onFailure(Throwable t) - { - chainEndFuture.completeExceptionally(t); - } - }); - } - - @Override - public void onFailure(Throwable t) - { - chainEndFuture.completeExceptionally(t); - } - }); + final ListenableFuture future = ListenableFutures + .transform( + pushInBackground(null, segmentIdentifierList), + this::dropInBackground + ); final SegmentsAndMetadata segmentsAndMetadata = pushAndClearTimeoutMs == 0L ? - chainEndFuture.get() : - chainEndFuture.get(pushAndClearTimeoutMs, TimeUnit.MILLISECONDS); + future.get() : + future.get(pushAndClearTimeoutMs, TimeUnit.MILLISECONDS); // Sanity check final Map pushedSegmentIdToSegmentMap = segmentsAndMetadata diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index bd43375e58c8..569007acf11d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -31,6 +31,7 @@ import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.ListenableFutures; import io.druid.java.util.common.logger.Logger; import io.druid.query.SegmentDescriptor; import io.druid.segment.realtime.FireDepartmentMetrics; @@ -267,43 +268,21 @@ public ListenableFuture publish( .map(SegmentWithState::getSegmentIdentifier) .collect(Collectors.toList()); - final SettableFuture publishCompleted = SettableFuture.create(); - final ListenableFuture publishFuture = pushInBackground(wrapCommitter(committer), theSegments); - Futures.addCallback(publishFuture, new FutureCallback() - { - @Override - public void onSuccess(@Nullable SegmentsAndMetadata result) - { - final ListenableFuture publishMoreFuture = publishInBackground( - result, + final ListenableFuture publishFuture = ListenableFutures.transform( + pushInBackground(wrapCommitter(committer), theSegments), + sam -> publishInBackground( + sam, publisher - ); - Futures.addCallback(publishFuture, new FutureCallback() - { - @Override - public void onSuccess(@Nullable SegmentsAndMetadata result) - { - synchronized (segments) { - sequenceNames.forEach(segments::remove); - } - publishCompleted.set(result); - } - - @Override - public void onFailure(Throwable t) - { - publishCompleted.setException(t); - } - }); - } - - @Override - public void onFailure(Throwable t) - { - publishCompleted.setException(t); + ) + ); + return ListenableFutures.transform(publishFuture, sam -> { + synchronized (segments) { + sequenceNames.forEach(segments::remove); } + final SettableFuture future = SettableFuture.create(); + future.set(sam); + return future; }); - return publishCompleted; } /** @@ -396,23 +375,11 @@ public ListenableFuture publishAndRegisterHandoff( final Collection sequenceNames ) { - final SettableFuture publishDoneFuture = SettableFuture.create(); - final ListenableFuture publishFuture = publish(publisher, committer, sequenceNames); - Futures.addCallback(publishFuture, new FutureCallback() - { - @Override - public void onSuccess(@Nullable SegmentsAndMetadata result) - { - publishDoneFuture.set(result); - } - - @Override - public void onFailure(Throwable t) - { - publishDoneFuture.setException(t); - } - }); - return publishDoneFuture; + return ListenableFutures + .transform( + publish(publisher, committer, sequenceNames), + this::registerHandoff + ); } @Override From b9cc2ab0219d51186adbeb4492d877fd17588652 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 22 Feb 2018 17:08:28 -0800 Subject: [PATCH 05/15] Format fix --- .../java/util/common/concurrent/ListenableFutures.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java index 14b301104226..93c0b02f5f8a 100644 --- a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java +++ b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java @@ -29,7 +29,11 @@ public class ListenableFutures { - public static ListenableFuture transform(final ListenableFuture inFuture, final Function> transform) { + public static ListenableFuture transform( + final ListenableFuture inFuture, + final Function> transform + ) + { final SettableFuture finalFuture = SettableFuture.create(); Futures.addCallback(inFuture, new FutureCallback() { From 7e34fd38a790e49814e2d419e23e011fa4473b93 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 22 Feb 2018 17:25:50 -0800 Subject: [PATCH 06/15] Add forbidden guava apis --- codestyle/guava-forbidden-apis.txt | 1 + integration-tests/pom.xml | 2 ++ pom.xml | 1 + 3 files changed, 4 insertions(+) create mode 100644 codestyle/guava-forbidden-apis.txt diff --git a/codestyle/guava-forbidden-apis.txt b/codestyle/guava-forbidden-apis.txt new file mode 100644 index 000000000000..961554890799 --- /dev/null +++ b/codestyle/guava-forbidden-apis.txt @@ -0,0 +1 @@ +com.google.common.util.concurrent.Futures.transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures.transform \ No newline at end of file diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 1df4c94cf5e7..99b8c93cb6e0 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -229,6 +229,7 @@ ../codestyle/joda-time-forbidden-apis.txt ../codestyle/druid-forbidden-apis.txt + ../codestyle/guava-forbidden-apis.txt @@ -278,6 +279,7 @@ ../codestyle/joda-time-forbidden-apis.txt ../codestyle/druid-forbidden-apis.txt + ../codestyle/guava-forbidden-apis.txt diff --git a/pom.xml b/pom.xml index 2978d922fc44..c371c002e125 100644 --- a/pom.xml +++ b/pom.xml @@ -960,6 +960,7 @@ ${session.executionRootDirectory}/codestyle/joda-time-forbidden-apis.txt ${session.executionRootDirectory}/codestyle/druid-forbidden-apis.txt + ${session.executionRootDirectory}/codestyle/guava-forbidden-apis.txt io/druid/java/util/common/DateTimes$UtcFormatter.class From f831dcb667ee5f53686ef12e8fb8d2440758a9df Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 26 Feb 2018 17:54:19 -0800 Subject: [PATCH 07/15] Make the ListenableFutrues.transformAsync have comments --- codestyle/druid-forbidden-apis.txt | 3 ++- codestyle/guava-forbidden-apis.txt | 1 - .../druid/data/input/parquet/GenericRecordAsMap.java | 2 +- .../task/AppenderatorDriverRealtimeIndexTask.java | 2 +- integration-tests/pom.xml | 2 -- .../java/util/common/concurrent/ListenableFutures.java | 7 ++++++- pom.xml | 1 - .../main/java/io/druid/query/BitmapResultFactory.java | 2 +- .../java/io/druid/query/extraction/ExtractionFn.java | 2 +- .../java/io/druid/query/filter/DimFilterUtils.java | 4 ++-- .../src/main/java/io/druid/segment/filter/Filters.java | 2 +- .../segment/incremental/IncrementalIndexAdapter.java | 2 +- .../java/io/druid/segment/transform/Transform.java | 10 +++++----- .../java/io/druid/segment/transform/TransformSpec.java | 4 ++-- .../realtime/appenderator/BatchAppenderatorDriver.java | 2 +- .../appenderator/StreamAppenderatorDriver.java | 6 +++--- 16 files changed, 27 insertions(+), 25 deletions(-) delete mode 100644 codestyle/guava-forbidden-apis.txt diff --git a/codestyle/druid-forbidden-apis.txt b/codestyle/druid-forbidden-apis.txt index abc3d6e00f4d..64c26e7c66be 100644 --- a/codestyle/druid-forbidden-apis.txt +++ b/codestyle/druid-forbidden-apis.txt @@ -1,2 +1,3 @@ com.google.common.collect.MapMaker @ Create java.util.concurrent.ConcurrentHashMap directly -com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly \ No newline at end of file +com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly +com.google.common.util.concurrent.Futures.transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures.transformAsync diff --git a/codestyle/guava-forbidden-apis.txt b/codestyle/guava-forbidden-apis.txt deleted file mode 100644 index 961554890799..000000000000 --- a/codestyle/guava-forbidden-apis.txt +++ /dev/null @@ -1 +0,0 @@ -com.google.common.util.concurrent.Futures.transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures.transform \ No newline at end of file diff --git a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/GenericRecordAsMap.java b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/GenericRecordAsMap.java index 3f568c24e149..8cbb913bb945 100644 --- a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/GenericRecordAsMap.java +++ b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/GenericRecordAsMap.java @@ -74,7 +74,7 @@ public boolean containsValue(Object value) *
        *
      • null, boolean, int, long, float, double, string, Records, Enums, Maps, Fixed -> String, using String.valueOf
      • *
      • bytes -> Arrays.toString() or new String if binaryAsString is true
      • - *
      • Arrays -> List<String>, using Lists.transform(<List>dimValue, TO_STRING_INCLUDING_NULL)
      • + *
      • Arrays -> List<String>, using Lists.transformAsync(<List>dimValue, TO_STRING_INCLUDING_NULL)
      • *
      *
    • avro schema type -> druid metric:
    • *
        diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index bddfdc2d56a8..3181b252544a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -437,7 +437,7 @@ private void publishSegments( committerSupplier.get(), Collections.singletonList(sequenceName) ); - pendingHandoffs.add(ListenableFutures.transform(publishFuture, driver::registerHandoff)); + pendingHandoffs.add(ListenableFutures.transformAsync(publishFuture, driver::registerHandoff)); } private void waitForSegmentPublishAndHandoff(long timeout) throws InterruptedException, ExecutionException, diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 99b8c93cb6e0..1df4c94cf5e7 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -229,7 +229,6 @@ ../codestyle/joda-time-forbidden-apis.txt ../codestyle/druid-forbidden-apis.txt - ../codestyle/guava-forbidden-apis.txt @@ -279,7 +278,6 @@ ../codestyle/joda-time-forbidden-apis.txt ../codestyle/druid-forbidden-apis.txt - ../codestyle/guava-forbidden-apis.txt diff --git a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java index 93c0b02f5f8a..a26360f6fd61 100644 --- a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java +++ b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java @@ -29,7 +29,12 @@ public class ListenableFutures { - public static ListenableFuture transform( + /** + * Guava 19 changes the Futures.transform signature so that the async form is different. This is here as a + * compatability layer until such a time as druid only supports Guava 19 or later, in which case + * Futrues.transformAsync should be used + */ + public static ListenableFuture transformAsync( final ListenableFuture inFuture, final Function> transform ) diff --git a/pom.xml b/pom.xml index c371c002e125..2978d922fc44 100644 --- a/pom.xml +++ b/pom.xml @@ -960,7 +960,6 @@ ${session.executionRootDirectory}/codestyle/joda-time-forbidden-apis.txt ${session.executionRootDirectory}/codestyle/druid-forbidden-apis.txt - ${session.executionRootDirectory}/codestyle/guava-forbidden-apis.txt io/druid/java/util/common/DateTimes$UtcFormatter.class diff --git a/processing/src/main/java/io/druid/query/BitmapResultFactory.java b/processing/src/main/java/io/druid/query/BitmapResultFactory.java index b4cffb7f67d8..b7ebf3ec6857 100644 --- a/processing/src/main/java/io/druid/query/BitmapResultFactory.java +++ b/processing/src/main/java/io/druid/query/BitmapResultFactory.java @@ -83,7 +83,7 @@ public interface BitmapResultFactory T union(Iterable bitmapResults); /** - * Equivalent of intersection(Iterables.transform(dimensionValueBitmaps, factory::wrapDimensionValue)), but doesn't + * Equivalent of intersection(Iterables.transformAsync(dimensionValueBitmaps, factory::wrapDimensionValue)), but doesn't * create a lot of bitmap result objects. */ T unionDimensionValueBitmaps(Iterable dimensionValueBitmaps); diff --git a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java index 9f7afb8b21c4..93fc26490891 100644 --- a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java +++ b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java @@ -49,7 +49,7 @@ @JsonSubTypes.Type(name = "strlen", value = StrlenExtractionFn.class) }) /** - * An ExtractionFn is a function that can be used to transform the values of a column (typically a dimension). + * An ExtractionFn is a function that can be used to transformAsync the values of a column (typically a dimension). * Note that ExtractionFn implementations are expected to be Threadsafe. * * A simple example of the type of operation this enables is the RegexDimExtractionFn which applies a diff --git a/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java b/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java index 42ee9eeba77a..89f7655071cb 100644 --- a/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java +++ b/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java @@ -88,7 +88,7 @@ static byte[] computeCacheKey(byte cacheIdKey, List filters) * @param dimFilter The filter to use * @param input The iterable of objects to be filtered * @param converter The function to convert T to ShardSpec that can be filtered by - * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * @param This can be any type, as long as transformAsync function is provided to convert this to ShardSpec * @return The set of filtered object, in the same order as input */ public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter) @@ -109,7 +109,7 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu * @param input The iterable of objects to be filtered * @param converter The function to convert T to ShardSpec that can be filtered by * @param dimensionRangeCache The cache of RangeSets of different dimensions for the dimFilter - * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * @param This can be any type, as long as transformAsync function is provided to convert this to ShardSpec * @return The set of filtered object, in the same order as input */ public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter, diff --git a/processing/src/main/java/io/druid/segment/filter/Filters.java b/processing/src/main/java/io/druid/segment/filter/Filters.java index 684fd2f28fb5..b057e8c9ceaf 100644 --- a/processing/src/main/java/io/druid/segment/filter/Filters.java +++ b/processing/src/main/java/io/druid/segment/filter/Filters.java @@ -199,7 +199,7 @@ public static ImmutableBitmap allTrue(final BitmapIndexSelector selector) */ static Iterable bitmapsFromIndexes(final IntIterable indexes, final BitmapIndex bitmapIndex) { - // Do not use Iterables.transform() to avoid boxing/unboxing integers. + // Do not use Iterables.transformAsync() to avoid boxing/unboxing integers. return new Iterable() { @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 8eb148d6bb4a..c80a1e5247f5 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -169,7 +169,7 @@ public Iterator iterator() } /* - * Note that the transform function increments a counter to determine the rowNum of + * Note that the transformAsync function increments a counter to determine the rowNum of * the iterated Rowboats. We need to return a new iterator on each * iterator() call to ensure the counter starts at 0. */ diff --git a/processing/src/main/java/io/druid/segment/transform/Transform.java b/processing/src/main/java/io/druid/segment/transform/Transform.java index fe3b043d5197..bf21552747bf 100644 --- a/processing/src/main/java/io/druid/segment/transform/Transform.java +++ b/processing/src/main/java/io/druid/segment/transform/Transform.java @@ -23,12 +23,12 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; /** - * A row transform that is part of a {@link TransformSpec}. Transforms allow adding new fields to input rows. Each + * A row transformAsync that is part of a {@link TransformSpec}. Transforms allow adding new fields to input rows. Each * one has a "name" (the name of the new field) which can be referred to by DimensionSpecs, AggregatorFactories, etc. * Each also has a "row function", which produces values for this new field based on looking at the entire input row. * - * If a transform has the same name as a field in an input row, then it will shadow the original field. Transforms - * that shadow fields may still refer to the fields they shadow. This can be used to transform a field "in-place". + * If a transformAsync has the same name as a field in an input row, then it will shadow the original field. Transforms + * that shadow fields may still refer to the fields they shadow. This can be used to transformAsync a field "in-place". * * Transforms do have some limitations. They can only refer to fields present in the actual input rows; in particular, * they cannot refer to other transforms. And they cannot remove fields, only add them. However, they can shadow a @@ -41,12 +41,12 @@ public interface Transform { /** - * Returns the field name for this transform. + * Returns the field name for this transformAsync. */ String getName(); /** - * Returns the function for this transform. The RowFunction takes an entire row as input and returns a column value + * Returns the function for this transformAsync. The RowFunction takes an entire row as input and returns a column value * as output. */ RowFunction getRowFunction(); diff --git a/processing/src/main/java/io/druid/segment/transform/TransformSpec.java b/processing/src/main/java/io/druid/segment/transform/TransformSpec.java index 706b022c7f24..e2a0d5aea705 100644 --- a/processing/src/main/java/io/druid/segment/transform/TransformSpec.java +++ b/processing/src/main/java/io/druid/segment/transform/TransformSpec.java @@ -38,9 +38,9 @@ /** * Specifies how input rows should be filtered and transforms. There are two parts: a "filter" (which can filter out * input rows) and "transforms" (which can add fields to input rows). Filters may refer to fields generated by - * a transform. + * a transformAsync. * - * See {@link Transform} for details on how each transform works. + * See {@link Transform} for details on how each transformAsync works. */ public class TransformSpec { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index 998dbb06f5aa..be5a09ab1b96 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -133,7 +133,7 @@ private SegmentsAndMetadata pushAndClear( .collect(Collectors.toList()); final ListenableFuture future = ListenableFutures - .transform( + .transformAsync( pushInBackground(null, segmentIdentifierList), this::dropInBackground ); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 569007acf11d..7837057bc956 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -268,14 +268,14 @@ public ListenableFuture publish( .map(SegmentWithState::getSegmentIdentifier) .collect(Collectors.toList()); - final ListenableFuture publishFuture = ListenableFutures.transform( + final ListenableFuture publishFuture = ListenableFutures.transformAsync( pushInBackground(wrapCommitter(committer), theSegments), sam -> publishInBackground( sam, publisher ) ); - return ListenableFutures.transform(publishFuture, sam -> { + return ListenableFutures.transformAsync(publishFuture, sam -> { synchronized (segments) { sequenceNames.forEach(segments::remove); } @@ -376,7 +376,7 @@ public ListenableFuture publishAndRegisterHandoff( ) { return ListenableFutures - .transform( + .transformAsync( publish(publisher, committer, sequenceNames), this::registerHandoff ); From ac6caa526db376420f0ca57b7843723e79e9eda5 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 27 Feb 2018 08:37:51 -0800 Subject: [PATCH 08/15] Undo intellij bad pattern matching in comments --- .../druid/data/input/parquet/GenericRecordAsMap.java | 2 +- .../main/java/io/druid/query/BitmapResultFactory.java | 2 +- .../java/io/druid/query/extraction/ExtractionFn.java | 2 +- .../java/io/druid/query/filter/DimFilterUtils.java | 4 ++-- .../src/main/java/io/druid/segment/filter/Filters.java | 2 +- .../segment/incremental/IncrementalIndexAdapter.java | 2 +- .../java/io/druid/segment/transform/Transform.java | 10 +++++----- .../java/io/druid/segment/transform/TransformSpec.java | 4 ++-- 8 files changed, 14 insertions(+), 14 deletions(-) diff --git a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/GenericRecordAsMap.java b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/GenericRecordAsMap.java index 8cbb913bb945..3f568c24e149 100644 --- a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/GenericRecordAsMap.java +++ b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/GenericRecordAsMap.java @@ -74,7 +74,7 @@ public boolean containsValue(Object value) *
          *
        • null, boolean, int, long, float, double, string, Records, Enums, Maps, Fixed -> String, using String.valueOf
        • *
        • bytes -> Arrays.toString() or new String if binaryAsString is true
        • - *
        • Arrays -> List<String>, using Lists.transformAsync(<List>dimValue, TO_STRING_INCLUDING_NULL)
        • + *
        • Arrays -> List<String>, using Lists.transform(<List>dimValue, TO_STRING_INCLUDING_NULL)
        • *
        *
      • avro schema type -> druid metric:
      • *
          diff --git a/processing/src/main/java/io/druid/query/BitmapResultFactory.java b/processing/src/main/java/io/druid/query/BitmapResultFactory.java index b7ebf3ec6857..b4cffb7f67d8 100644 --- a/processing/src/main/java/io/druid/query/BitmapResultFactory.java +++ b/processing/src/main/java/io/druid/query/BitmapResultFactory.java @@ -83,7 +83,7 @@ public interface BitmapResultFactory T union(Iterable bitmapResults); /** - * Equivalent of intersection(Iterables.transformAsync(dimensionValueBitmaps, factory::wrapDimensionValue)), but doesn't + * Equivalent of intersection(Iterables.transform(dimensionValueBitmaps, factory::wrapDimensionValue)), but doesn't * create a lot of bitmap result objects. */ T unionDimensionValueBitmaps(Iterable dimensionValueBitmaps); diff --git a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java index 93fc26490891..9f7afb8b21c4 100644 --- a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java +++ b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java @@ -49,7 +49,7 @@ @JsonSubTypes.Type(name = "strlen", value = StrlenExtractionFn.class) }) /** - * An ExtractionFn is a function that can be used to transformAsync the values of a column (typically a dimension). + * An ExtractionFn is a function that can be used to transform the values of a column (typically a dimension). * Note that ExtractionFn implementations are expected to be Threadsafe. * * A simple example of the type of operation this enables is the RegexDimExtractionFn which applies a diff --git a/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java b/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java index 89f7655071cb..42ee9eeba77a 100644 --- a/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java +++ b/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java @@ -88,7 +88,7 @@ static byte[] computeCacheKey(byte cacheIdKey, List filters) * @param dimFilter The filter to use * @param input The iterable of objects to be filtered * @param converter The function to convert T to ShardSpec that can be filtered by - * @param This can be any type, as long as transformAsync function is provided to convert this to ShardSpec + * @param This can be any type, as long as transform function is provided to convert this to ShardSpec * @return The set of filtered object, in the same order as input */ public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter) @@ -109,7 +109,7 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu * @param input The iterable of objects to be filtered * @param converter The function to convert T to ShardSpec that can be filtered by * @param dimensionRangeCache The cache of RangeSets of different dimensions for the dimFilter - * @param This can be any type, as long as transformAsync function is provided to convert this to ShardSpec + * @param This can be any type, as long as transform function is provided to convert this to ShardSpec * @return The set of filtered object, in the same order as input */ public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter, diff --git a/processing/src/main/java/io/druid/segment/filter/Filters.java b/processing/src/main/java/io/druid/segment/filter/Filters.java index b057e8c9ceaf..29ea6313a99e 100644 --- a/processing/src/main/java/io/druid/segment/filter/Filters.java +++ b/processing/src/main/java/io/druid/segment/filter/Filters.java @@ -199,7 +199,7 @@ public static ImmutableBitmap allTrue(final BitmapIndexSelector selector) */ static Iterable bitmapsFromIndexes(final IntIterable indexes, final BitmapIndex bitmapIndex) { - // Do not use Iterables.transformAsync() to avoid boxing/unboxing integers. + // Do not use Iterables.transformA() to avoid boxing/unboxing integers. return new Iterable() { @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index c80a1e5247f5..8eb148d6bb4a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -169,7 +169,7 @@ public Iterator iterator() } /* - * Note that the transformAsync function increments a counter to determine the rowNum of + * Note that the transform function increments a counter to determine the rowNum of * the iterated Rowboats. We need to return a new iterator on each * iterator() call to ensure the counter starts at 0. */ diff --git a/processing/src/main/java/io/druid/segment/transform/Transform.java b/processing/src/main/java/io/druid/segment/transform/Transform.java index bf21552747bf..fe3b043d5197 100644 --- a/processing/src/main/java/io/druid/segment/transform/Transform.java +++ b/processing/src/main/java/io/druid/segment/transform/Transform.java @@ -23,12 +23,12 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; /** - * A row transformAsync that is part of a {@link TransformSpec}. Transforms allow adding new fields to input rows. Each + * A row transform that is part of a {@link TransformSpec}. Transforms allow adding new fields to input rows. Each * one has a "name" (the name of the new field) which can be referred to by DimensionSpecs, AggregatorFactories, etc. * Each also has a "row function", which produces values for this new field based on looking at the entire input row. * - * If a transformAsync has the same name as a field in an input row, then it will shadow the original field. Transforms - * that shadow fields may still refer to the fields they shadow. This can be used to transformAsync a field "in-place". + * If a transform has the same name as a field in an input row, then it will shadow the original field. Transforms + * that shadow fields may still refer to the fields they shadow. This can be used to transform a field "in-place". * * Transforms do have some limitations. They can only refer to fields present in the actual input rows; in particular, * they cannot refer to other transforms. And they cannot remove fields, only add them. However, they can shadow a @@ -41,12 +41,12 @@ public interface Transform { /** - * Returns the field name for this transformAsync. + * Returns the field name for this transform. */ String getName(); /** - * Returns the function for this transformAsync. The RowFunction takes an entire row as input and returns a column value + * Returns the function for this transform. The RowFunction takes an entire row as input and returns a column value * as output. */ RowFunction getRowFunction(); diff --git a/processing/src/main/java/io/druid/segment/transform/TransformSpec.java b/processing/src/main/java/io/druid/segment/transform/TransformSpec.java index e2a0d5aea705..706b022c7f24 100644 --- a/processing/src/main/java/io/druid/segment/transform/TransformSpec.java +++ b/processing/src/main/java/io/druid/segment/transform/TransformSpec.java @@ -38,9 +38,9 @@ /** * Specifies how input rows should be filtered and transforms. There are two parts: a "filter" (which can filter out * input rows) and "transforms" (which can add fields to input rows). Filters may refer to fields generated by - * a transformAsync. + * a transform. * - * See {@link Transform} for details on how each transformAsync works. + * See {@link Transform} for details on how each transform works. */ public class TransformSpec { From 5b696a98cc68f40411099ad3a903e119374121e2 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 27 Feb 2018 08:38:24 -0800 Subject: [PATCH 09/15] Futrues --> Futures --- .../io/druid/java/util/common/concurrent/ListenableFutures.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java index a26360f6fd61..decafe2013c9 100644 --- a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java +++ b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java @@ -32,7 +32,7 @@ public class ListenableFutures /** * Guava 19 changes the Futures.transform signature so that the async form is different. This is here as a * compatability layer until such a time as druid only supports Guava 19 or later, in which case - * Futrues.transformAsync should be used + * Futures.transformAsync should be used */ public static ListenableFuture transformAsync( final ListenableFuture inFuture, From 32407288de9b236e25b69800b6de7be7a28c8000 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 27 Feb 2018 08:41:27 -0800 Subject: [PATCH 10/15] Add empty iterators forbidding --- codestyle/druid-forbidden-apis.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/codestyle/druid-forbidden-apis.txt b/codestyle/druid-forbidden-apis.txt index 64c26e7c66be..bd3172fd2490 100644 --- a/codestyle/druid-forbidden-apis.txt +++ b/codestyle/druid-forbidden-apis.txt @@ -1,3 +1,4 @@ com.google.common.collect.MapMaker @ Create java.util.concurrent.ConcurrentHashMap directly com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly -com.google.common.util.concurrent.Futures.transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures.transformAsync +com.google.common.util.concurrent.Futures.transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures#transformAsync +com.google.common.collect.Iterators#emptyIterator() @ Use java.util.Collections#emptyIterator() \ No newline at end of file From fa03a75c8495a9d9ba92bd9d55c85e8090c13ddc Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 27 Feb 2018 08:42:52 -0800 Subject: [PATCH 11/15] Fix extra `A` --- processing/src/main/java/io/druid/segment/filter/Filters.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/segment/filter/Filters.java b/processing/src/main/java/io/druid/segment/filter/Filters.java index 29ea6313a99e..684fd2f28fb5 100644 --- a/processing/src/main/java/io/druid/segment/filter/Filters.java +++ b/processing/src/main/java/io/druid/segment/filter/Filters.java @@ -199,7 +199,7 @@ public static ImmutableBitmap allTrue(final BitmapIndexSelector selector) */ static Iterable bitmapsFromIndexes(final IntIterable indexes, final BitmapIndex bitmapIndex) { - // Do not use Iterables.transformA() to avoid boxing/unboxing integers. + // Do not use Iterables.transform() to avoid boxing/unboxing integers. return new Iterable() { @Override From 13692b86323eb778e6496556b6ea6971e159394b Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 12 Mar 2018 07:21:16 -0700 Subject: [PATCH 12/15] Correct method signature --- codestyle/druid-forbidden-apis.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codestyle/druid-forbidden-apis.txt b/codestyle/druid-forbidden-apis.txt index bd3172fd2490..fa39ed5b0e22 100644 --- a/codestyle/druid-forbidden-apis.txt +++ b/codestyle/druid-forbidden-apis.txt @@ -1,4 +1,4 @@ com.google.common.collect.MapMaker @ Create java.util.concurrent.ConcurrentHashMap directly com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly -com.google.common.util.concurrent.Futures.transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures#transformAsync +com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures#transformAsync com.google.common.collect.Iterators#emptyIterator() @ Use java.util.Collections#emptyIterator() \ No newline at end of file From cc5d60ea78137b10bddba84d8e16057b97fcdd85 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 19 Mar 2018 10:21:02 -0700 Subject: [PATCH 13/15] Address review comments --- .../java/util/common/concurrent/ListenableFutures.java | 2 ++ .../realtime/appenderator/StreamAppenderatorDriver.java | 9 ++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java index decafe2013c9..44cda8903824 100644 --- a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java +++ b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java @@ -33,6 +33,8 @@ public class ListenableFutures * Guava 19 changes the Futures.transform signature so that the async form is different. This is here as a * compatability layer until such a time as druid only supports Guava 19 or later, in which case * Futures.transformAsync should be used + * + * This is NOT copied from guava. */ public static ListenableFuture transformAsync( final ListenableFuture inFuture, diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 7837057bc956..1e3831faea02 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -375,11 +375,10 @@ public ListenableFuture publishAndRegisterHandoff( final Collection sequenceNames ) { - return ListenableFutures - .transformAsync( - publish(publisher, committer, sequenceNames), - this::registerHandoff - ); + return ListenableFutures.transformAsync( + publish(publisher, committer, sequenceNames), + this::registerHandoff + ); } @Override From 3bbe113d1aed3fb8bbe0f242bc0fa3fdbe51637e Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 19 Mar 2018 10:28:35 -0700 Subject: [PATCH 14/15] Finish Gian review comments --- .../appenderator/BatchAppenderatorDriver.java | 9 +++---- .../StreamAppenderatorDriver.java | 26 ++++++++++--------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index be5a09ab1b96..73fa2c48ff4d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -132,11 +132,10 @@ private SegmentsAndMetadata pushAndClear( .map(SegmentWithState::getSegmentIdentifier) .collect(Collectors.toList()); - final ListenableFuture future = ListenableFutures - .transformAsync( - pushInBackground(null, segmentIdentifierList), - this::dropInBackground - ); + final ListenableFuture future = ListenableFutures.transformAsync( + pushInBackground(null, segmentIdentifierList), + this::dropInBackground + ); final SegmentsAndMetadata segmentsAndMetadata = pushAndClearTimeoutMs == 0L ? future.get() : diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 1e3831faea02..3f7fdde32913 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -53,11 +54,11 @@ /** * This class is specialized for streaming ingestion. In streaming ingestion, the segment lifecycle is like: - *

          + * *

            * APPENDING -> APPEND_FINISHED -> PUBLISHED
            * 
          - *

          + * *

            *
          • APPENDING: Segment is available for appending.
          • *
          • APPEND_FINISHED: Segment cannot be updated (data cannot be added anymore) and is waiting for being published.
          • @@ -209,7 +210,7 @@ public void moveSegmentOut(final String sequenceName, final List + * * Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}. * * @param committer committer representing all data that has been added so far @@ -235,7 +236,7 @@ public Object persist(final Committer committer) throws InterruptedException /** * Persist all data indexed through this driver so far. Returns a future of persisted commitMetadata. - *

            + * * Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}. * * @param committer committer representing all data that has been added so far @@ -275,14 +276,15 @@ public ListenableFuture publish( publisher ) ); - return ListenableFutures.transformAsync(publishFuture, sam -> { - synchronized (segments) { - sequenceNames.forEach(segments::remove); - } - final SettableFuture future = SettableFuture.create(); - future.set(sam); - return future; - }); + return Futures.transform( + publishFuture, + (Function) sam -> { + synchronized (segments) { + sequenceNames.forEach(segments::remove); + } + return sam; + } + ); } /** From 716c63191e9b2326dc1b9662830c5cfb40d16ddd Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 19 Mar 2018 10:41:47 -0700 Subject: [PATCH 15/15] Proper syntax from https://github.com/policeman-tools/forbidden-apis/wiki/SignaturesSyntax --- codestyle/druid-forbidden-apis.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codestyle/druid-forbidden-apis.txt b/codestyle/druid-forbidden-apis.txt index fa39ed5b0e22..96db08826006 100644 --- a/codestyle/druid-forbidden-apis.txt +++ b/codestyle/druid-forbidden-apis.txt @@ -1,4 +1,4 @@ com.google.common.collect.MapMaker @ Create java.util.concurrent.ConcurrentHashMap directly com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly -com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures#transformAsync +com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures#transformAsync com.google.common.collect.Iterators#emptyIterator() @ Use java.util.Collections#emptyIterator() \ No newline at end of file