Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion codestyle/druid-forbidden-apis.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
com.google.common.collect.MapMaker @ Create java.util.concurrent.ConcurrentHashMap directly
com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly
com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly
com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures#transformAsync
com.google.common.collect.Iterators#emptyIterator() @ Use java.util.Collections#emptyIterator()
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public String readString(ByteBuffer in)
final int length = in.getInt();
return StringUtils.fromUtf8(readBytes(in, length));
}

public byte[] readBytes(ByteBuffer in, int length)
{
byte[] bytes = new byte[length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
Expand All @@ -48,6 +47,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -199,7 +199,7 @@ public Firehose connect(

return new Firehose()
{
private Iterator<InputRow> nextIterator = Iterators.emptyIterator();
private Iterator<InputRow> nextIterator = Collections.emptyIterator();

@Override
public boolean hasMore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import io.druid.data.input.ByteBufferInputRowParser;
Expand All @@ -38,6 +37,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -177,7 +177,7 @@ public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object
private volatile boolean stopped;
private volatile BytesMessageWithOffset msg = null;
private volatile InputRow row = null;
private volatile Iterator<InputRow> nextIterator = Iterators.emptyIterator();
private volatile Iterator<InputRow> nextIterator = Collections.emptyIterator();

{
lastOffsetPartitions = Maps.newHashMap();
Expand Down Expand Up @@ -212,7 +212,7 @@ private void nextMessage()
msg = messageQueue.take();
final byte[] message = msg.message();
nextIterator = message == null
? Iterators.emptyIterator()
? Collections.emptyIterator()
: firehoseParser.parseBatch(ByteBuffer.wrap(message)).iterator();
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
Expand All @@ -46,6 +45,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -205,7 +205,7 @@ public void shutdownCompleted(ShutdownSignalException cause)
*/
private long lastDeliveryTag;

private Iterator<InputRow> nextIterator = Iterators.emptyIterator();
private Iterator<InputRow> nextIterator = Collections.emptyIterator();

@Override
public boolean hasMore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
Expand All @@ -40,6 +39,7 @@
import javax.annotation.Nullable;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -107,7 +107,7 @@ public Firehose connect(final InputRowParser<ByteBuffer> firehoseParser, File te

return new Firehose()
{
Iterator<InputRow> nextIterator = Iterators.emptyIterator();
Iterator<InputRow> nextIterator = Collections.emptyIterator();

@Override
public boolean hasMore()
Expand Down
30 changes: 12 additions & 18 deletions indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.FileUtils;
Expand Down Expand Up @@ -90,6 +88,7 @@ public static Path distributedClassPath(Path base)
{
return new Path(base, "classpath");
}

public static final String INDEX_ZIP = "index.zip";
public static final String DESCRIPTOR_JSON = "descriptor.json";

Expand Down Expand Up @@ -277,17 +276,9 @@ static void addSnapshotJarToClassPath(
static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException
{
log.info("Uploading jar to path[%s]", path);
ByteStreams.copy(
Files.newInputStreamSupplier(jarFile),
new OutputSupplier<OutputStream>()
{
@Override
public OutputStream getOutput() throws IOException
{
return fs.create(path);
}
}
);
try (OutputStream os = fs.create(path)) {
Files.asByteSource(jarFile).copyTo(os);
}
}

static boolean isSnapshot(File jarFile)
Expand Down Expand Up @@ -562,8 +553,10 @@ public static Path makeFileNamePath(
DataSegmentPusher dataSegmentPusher
)
{
return new Path(prependFSIfNullScheme(fs, basePath),
dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName));
return new Path(
prependFSIfNullScheme(fs, basePath),
dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName)
);
}

public static Path makeTmpPath(
Expand All @@ -576,9 +569,10 @@ public static Path makeTmpPath(
{
return new Path(
prependFSIfNullScheme(fs, basePath),
StringUtils.format("./%s.%d",
dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP),
taskAttemptID.getId()
StringUtils.format(
"./%s.%d",
dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP),
taskAttemptID.getId()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ListenableFutures;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.emitter.EmittingLogger;
Expand Down Expand Up @@ -388,9 +389,9 @@ public RealtimeAppenderatorIngestionSpec getSpec()
/**
* Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than
* abruptly stopping.
*
* <p>
* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this.
*
* <p>
* Protected for tests.
*/
protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)
Expand Down Expand Up @@ -431,19 +432,16 @@ private void publishSegments(
String sequenceName
)
{
ListenableFuture<SegmentsAndMetadata> publishFuture = driver.publish(
final ListenableFuture<SegmentsAndMetadata> publishFuture = driver.publish(
publisher,
committerSupplier.get(),
Collections.singletonList(sequenceName)
);

ListenableFuture<SegmentsAndMetadata> handoffFuture = Futures.transform(publishFuture, driver::registerHandoff);

pendingHandoffs.add(handoffFuture);
pendingHandoffs.add(ListenableFutures.transformAsync(publishFuture, driver::registerHandoff));
}

private void waitForSegmentPublishAndHandoff(long timeout) throws InterruptedException, ExecutionException,
TimeoutException
TimeoutException
{
if (!pendingHandoffs.isEmpty()) {
ListenableFuture<?> allHandoffs = Futures.allAsList(pendingHandoffs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testSimple() throws Exception

final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().openStream());
final String string = StringUtils.fromUtf8(bytes);
Assert.assertEquals(StringUtils.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.java.util.common.concurrent;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

import javax.annotation.Nullable;
import java.util.function.Function;

public class ListenableFutures
{
/**
* Guava 19 changes the Futures.transform signature so that the async form is different. This is here as a
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code copied from Guava? If so, the comment should say so. If not, the comment should also say so, since people might think it was anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* compatability layer until such a time as druid only supports Guava 19 or later, in which case
* Futures.transformAsync should be used
*
* This is NOT copied from guava.
*/
public static <I, O> ListenableFuture<O> transformAsync(
final ListenableFuture<I> inFuture,
final Function<I, ListenableFuture<O>> transform
)
{
final SettableFuture<O> finalFuture = SettableFuture.create();
Futures.addCallback(inFuture, new FutureCallback<I>()
{
@Override
public void onSuccess(@Nullable I result)
{
final ListenableFuture<O> transformFuture = transform.apply(result);
Futures.addCallback(transformFuture, new FutureCallback<O>()
{
@Override
public void onSuccess(@Nullable O result)
{
finalFuture.set(result);
}

@Override
public void onFailure(Throwable t)
{
finalFuture.setException(t);
}
});
}

@Override
public void onFailure(Throwable t)
{
finalFuture.setException(t);
}
});
return finalFuture;
}
}
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@
<artifactId>guice-multibindings</artifactId>
<version>${guice.version}</version>
</dependency>
<dependency>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this? Why this is needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future versions of guava fail to pull this in. 19 specifically if I recall. This causes builds to fail for guava 19 without it.

<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
Expand Down Expand Up @@ -57,6 +56,7 @@
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -320,7 +320,7 @@ public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBu
this.maxIntermediateRows = querySpecificConfig.getMaxIntermediateRows();

unprocessedKeys = null;
delegate = Iterators.emptyIterator();
delegate = Collections.emptyIterator();
dimensionSpecs = query.getDimensions();
dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.query.groupby.epinephelinae;

import com.google.common.base.Supplier;
import com.google.common.collect.Iterators;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.parsers.CloseableIterator;
Expand Down Expand Up @@ -170,7 +169,7 @@ public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
if (!initialized) {
// it's possible for iterator() to be called before initialization when
// a nested groupBy's subquery has an empty result set (see testEmptySubquery() in GroupByQueryRunnerTest)
return CloseableIterators.withEmptyBaggage(Iterators.<Entry<KeyType>>emptyIterator());
return CloseableIterators.withEmptyBaggage(Collections.<Entry<KeyType>>emptyIterator());
}

if (sorted) {
Expand Down
Loading