Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
9359961
Various changes
leventov Jul 6, 2018
ff5141d
Fixes
leventov Jul 6, 2018
e768315
Fixes
leventov Jul 7, 2018
9dce337
Null fix
leventov Jul 8, 2018
b5ef733
Fix inspection:
leventov Jul 9, 2018
ddef55d
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Jul 14, 2018
9c3f4f6
Fix in JavaScriptAggregatorFactory and License header fixes
leventov Jul 14, 2018
1bf6471
Fix checkstyle
leventov Jul 14, 2018
b01a43f
Close dimension value lookups in StringDimensionMergerV9
leventov Jul 15, 2018
407890e
Suppress MustBeClosedChecker
leventov Jul 15, 2018
bd10c18
Close CloseableIndexed in tests
leventov Jul 15, 2018
560f02f
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Jul 21, 2018
d5bff2a
Address review
leventov Jul 21, 2018
13383f3
Removed unused getClazz() methods; Fixed a performance bug in ObjectS…
leventov Jul 21, 2018
79aaff8
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Jul 27, 2018
69cf3b8
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Aug 3, 2018
3eab0a8
Fix SingleStringInputDimensionSelector
leventov Aug 3, 2018
4e72cb2
Rename ColumnSerializer to GenericColumnSerializer
leventov Aug 3, 2018
dac2c4f
Address comments
leventov Aug 10, 2018
5c62f63
Merge branch 'master' into various-changes
leventov Aug 30, 2018
0359f4d
Comments
leventov Aug 30, 2018
7127cea
Remove DoubleGenericColumnPartSerde
leventov Aug 31, 2018
1c4c5fe
Deadlock watching in CuratorDruidCoordinatorTest
leventov Aug 31, 2018
7765522
Enable looking for stuck thread in CuratorDruidCoordinatorTest
leventov Sep 3, 2018
ea7a341
Enable looking for stuck thread in RemoteTaskRunnerTest
leventov Sep 3, 2018
d31453b
Use JUnit's TemporaryFolder in PrefetchableTextFilesFirehoseFactoryTest
leventov Sep 4, 2018
2ebc85c
Increase timeout in PrefetchableTextFilesFirehoseFactoryTest
leventov Sep 4, 2018
186b855
KafkaSupervisor.awaitAllNoticesHandled
leventov Sep 4, 2018
316e33b
Add DeadlockDetectingTimeout TestRule
leventov Sep 4, 2018
0375f78
Revert "KafkaSupervisor.awaitAllNoticesHandled"
leventov Sep 4, 2018
2a79e3f
Fix brace
leventov Sep 4, 2018
e460e3e
Don't miss fetchFutures in Fetcher
leventov Sep 5, 2018
7668da5
volatile in CuratorDruidCoordinatorTest
leventov Sep 5, 2018
5878b0a
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Sep 7, 2018
9597044
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Sep 12, 2018
6fd6e44
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Sep 14, 2018
4949a83
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Sep 14, 2018
df517ca
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Sep 22, 2018
d02b6b6
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Sep 25, 2018
bd179f6
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Sep 26, 2018
f387ce8
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Sep 26, 2018
5b018cc
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Sep 29, 2018
4eb7f5c
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Oct 1, 2018
c0f7ceb
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Oct 1, 2018
17d0333
Merge remote-tracking branch 'upstream/master' into various-changes
leventov Oct 2, 2018
5fff237
Fix test
leventov Oct 2, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
15 changes: 12 additions & 3 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/src/main/java/org/apache/druid/data/input/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.guice.annotations.PublicApi;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.List;

/**
Expand Down Expand Up @@ -71,6 +72,7 @@ public interface Row extends Comparable<Row>
*
* @return the value of the provided column name
*/
@Nullable
Object getRaw(String dimension);

/**
Expand All @@ -79,5 +81,6 @@ public interface Row extends Comparable<Row>
* 1. If the column is absent in the row, numeric zero is returned, rather than null.
* 2. If the column has string value, an attempt is made to parse this value as a number.
*/
@Nullable
Number getMetric(String metric);
}
17 changes: 3 additions & 14 deletions api/src/main/java/org/apache/druid/data/input/Rows.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@
import org.apache.druid.java.util.common.parsers.ParseException;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
*/
public class Rows
{
public static final Long LONG_ZERO = 0L;

/**
* @param timeStamp rollup up timestamp to be used to create group key
Expand All @@ -55,10 +54,7 @@ public static List<Object> toGroupKey(long timeStamp, InputRow inputRow)
dims.put(dim, dimValues);
}
}
return ImmutableList.of(
timeStamp,
dims
);
return ImmutableList.of(timeStamp, dims);
}

/**
Expand All @@ -70,14 +66,7 @@ public static List<String> objectToStrings(final Object inputValue)
return Collections.emptyList();
} else if (inputValue instanceof List) {
// guava's toString function fails on null objects, so please do not use it
final List<Object> values = (List) inputValue;

final List<String> retVal = new ArrayList<>(values.size());
for (Object val : values) {
retVal.add(String.valueOf(val));
}

return retVal;
return ((List<?>) inputValue).stream().map(String::valueOf).collect(Collectors.toList());
} else {
return Collections.singletonList(String.valueOf(inputValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;

import java.io.File;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.apache.druid.data.input.impl;

import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.utils.Runnables;
import org.apache.commons.io.LineIterator;

import javax.annotation.Nullable;
import java.io.Closeable;
Expand Down Expand Up @@ -101,24 +101,9 @@ public Runnable commit()
@Override
public void close() throws IOException
{
try {
if (lineIterator != null) {
lineIterator.close();
}
}
catch (Throwable t) {
try {
if (closer != null) {
closer.close();
}
}
catch (Exception e) {
t.addSuppressed(e);
}
throw t;
}
if (closer != null) {
closer.close();
try (Closeable ignore = closer;
Closeable ignore2 = lineIterator != null ? lineIterator::close : null) {
// close both via try-with-resources
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -62,7 +64,7 @@ public abstract class Fetcher<T> implements Iterator<OpenedObject<T>>
// This is updated when a file is successfully fetched, a fetched file is deleted, or a fetched file is
// cached.
private final AtomicLong fetchedBytes = new AtomicLong(0);
private Future<Void> fetchFuture;
private final Deque<Future<Void>> fetchFutures = new ArrayDeque<>();
private PrefetchConfig prefetchConfig;

// nextFetchIndex indicates which object should be downloaded when fetch is triggered.
Expand Down Expand Up @@ -103,12 +105,13 @@ public abstract class Fetcher<T> implements Iterator<OpenedObject<T>>
*/
private void fetchIfNeeded(long remainingBytes)
{
if ((fetchFuture == null || fetchFuture.isDone())
if ((fetchFutures.isEmpty() || fetchFutures.peekLast().isDone())
&& remainingBytes <= prefetchConfig.getPrefetchTriggerBytes()) {
fetchFuture = fetchExecutor.submit(() -> {
Future<Void> fetchFuture = fetchExecutor.submit(() -> {
fetch();
return null;
});
fetchFutures.add(fetchFuture);
}
}

Expand Down Expand Up @@ -180,12 +183,17 @@ public OpenedObject<T> next()
private void checkFetchException(boolean wait)
{
try {
if (wait) {
fetchFuture.get(prefetchConfig.getFetchTimeout(), TimeUnit.MILLISECONDS);
fetchFuture = null;
} else if (fetchFuture != null && fetchFuture.isDone()) {
fetchFuture.get();
fetchFuture = null;
for (Future<Void> fetchFuture; (fetchFuture = fetchFutures.poll()) != null; ) {
if (wait) {
fetchFuture.get(prefetchConfig.getFetchTimeout(), TimeUnit.MILLISECONDS);
} else {
if (fetchFuture.isDone()) {
fetchFuture.get();
} else {
fetchFutures.addFirst(fetchFuture);
break;
}
}
}
}
catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
package org.apache.druid.data.input.impl.prefetch;

import com.google.common.base.Predicate;

import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.commons.io.IOUtils;

import javax.annotation.Nullable;
import java.io.Closeable;
Expand All @@ -33,7 +32,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

import java.util.concurrent.ExecutorService;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.SqlFirehose;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.LineIterator;

import javax.annotation.Nullable;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import org.apache.druid.data.input.impl.FileIteratingFirehose;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.commons.io.LineIterator;

import javax.annotation.Nullable;
import java.io.Closeable;
Expand Down Expand Up @@ -221,21 +221,24 @@ public LineIterator next()
}

final OpenedObject<T> openedObject = fetcher.next();
final InputStream stream;
try {
stream = wrapObjectStream(
openedObject.getObject(),
openedObject.getObjectStream()
return new ResourceCloseableLineIterator(
new InputStreamReader(
wrapObjectStream(openedObject.getObject(), openedObject.getObjectStream()),
StandardCharsets.UTF_8
),
openedObject.getResourceCloser()
);
}
catch (IOException e) {
try {
openedObject.getResourceCloser().close();
}
catch (Throwable t) {
e.addSuppressed(t);
}
throw new RuntimeException(e);
}

return new ResourceCloseableLineIterator(
new InputStreamReader(stream, StandardCharsets.UTF_8),
openedObject.getResourceCloser()
);
}
},
firehoseParser,
Expand Down Expand Up @@ -288,9 +291,8 @@ static class ResourceCloseableLineIterator extends LineIterator
@Override
public void close()
{
super.close();
try {
resourceCloser.close();
try (Closeable ignore = this.resourceCloser) {
super.close();
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.jackson.CommaListJoinDeserializer;
import org.apache.druid.jackson.CommaListJoinSerializer;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.InputRow;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.apache.druid.data.input.impl;

import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.parsers.Parser;
import junit.framework.Assert;
import org.apache.druid.java.util.common.parsers.Parser;
import org.junit.Test;

import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down
Loading