Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void finishJob()

try {
// User should have persisted everything by now.
Preconditions.checkState(!theSink.swappable(), "All data must be persisted before fininshing the job!");
Preconditions.checkState(!theSink.swappable(), "All data must be persisted before finishing the job!");

if (spilled.size() == 0) {
throw new IllegalStateException("Nothing indexed?");
Expand Down Expand Up @@ -225,8 +225,8 @@ public void finishJob()

private void spillIfSwappable()
{
if (theSink.swappable()) {
final FireHydrant indexToPersist = theSink.swap();
final FireHydrant indexToPersist = theSink.swap();
if (indexToPersist != null) {
final int rowsToPersist = indexToPersist.getIndex().size();
final File dirToPersist = getSpillDir(indexToPersist.getCount());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ static RealtimeTuningConfig convertTuningConfig(
null,
null,
null,
null,
shardSpec,
indexSpec,
buildV9Directly,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,7 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,
null,
new Period("P1Y"),
new Period("PT10M"),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ public Plumber findPlumber(

new RealtimeTuningConfig(
1,
null,
new Period("PT10M"),
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,7 @@ private RealtimeIndexTask newRealtimeIndexTask()
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,
null,
new Period("P1Y"),
null, //default window period of 10 minutes
null, // base persist dir ignored by Realtime Index task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,13 @@ private void innerClose() throws IOException
baseSegment.close();
}
}

@Override
public String toString()
{
return "ReferenceCountingSegment{" +
"baseSegment=" + baseSegment.getIdentifier() +
", numReferences=" + numReferences +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.common.utils.StringUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
Expand Down Expand Up @@ -356,6 +358,7 @@ public int lookupId(String name)
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final AggregatorType[] aggs;
private final int maxLengthForAggregators;
private final boolean deserializeComplexMetrics;
private final boolean reportParseExceptions;
private final boolean sortFacts;
Expand Down Expand Up @@ -450,19 +453,24 @@ public IncrementalIndex(
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
int length = 0;
for (AggregatorFactory factory : metrics) {
length += factory.getMaxIntermediateSize();
}
this.maxLengthForAggregators = length;
}

private DimDim newDimDim(String dimension, ValueType type) {
DimDim newDimDim;
switch (type) {
case LONG:
newDimDim = makeDimDim(dimension, getDimensionDescs());
newDimDim = makeDimDim(dimension, SizeEstimator.LONG, getDimensionDescs());
break;
case FLOAT:
newDimDim = makeDimDim(dimension, getDimensionDescs());
newDimDim = makeDimDim(dimension, SizeEstimator.FLOAT, getDimensionDescs());
break;
case STRING:
newDimDim = new NullValueConverterDimDim(makeDimDim(dimension, getDimensionDescs()));
newDimDim = new NullValueConverterDimDim(makeDimDim(dimension, SizeEstimator.STRING, getDimensionDescs()));
break;
default:
throw new IAE("Invalid column type: " + type);
Expand All @@ -471,7 +479,7 @@ private DimDim newDimDim(String dimension, ValueType type) {
}

// use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation
protected abstract DimDim makeDimDim(String dimension, Object lock);
protected abstract DimDim makeDimDim(String dimension, SizeEstimator estimator, Object lock);

public abstract ConcurrentMap<TimeAndDims, Integer> getFacts();

Expand Down Expand Up @@ -692,6 +700,15 @@ public int size()
return numEntries.get();
}

public int estimatedOccupation()
{
int occupation = maxLengthForAggregators * getFacts().size();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is meant to check the heap occupation. This is a great change, it's not awesomely accurate but it's probably better than guessing at maxRowsInMemory.

It'd be good to have a comment about the ways in which the estimate can be inaccurate:

  • maxLengthForAggregators is based on max offheap size, which may not apply to onheap aggregations.
  • dimension estimation is based on dictionary size only, not facts.
  • likely to be an underestimate rather than an overestimate.

Also, the heap footprint is really different depending on which IncrementalIndex implementation is being used, so maybe it's best to have IncrementalIndex do return 0 and move this impl to OnheapIncrementalIndex. I think the other incremental index impls would never have this method called (because Sink always uses OnheapIncrementalIndex).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May also be nice to add in estimation for the facts part of the dimensions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on all of the your comments. It's just for estimation which can be relative (if it's half of real occupation, we just can set it as a half) and advisory at best rather than exact.

  • for "maxLengthForAggregators", I'm just preying it's similar to on heap aggregator (Or should we add a method to AggregatorFactory?)
  • for "dimension estimation", would it be enough to add Ints.BYTES for occupation of index?
  • for "underestimate" part. Yes, I've thought of that and resulted rather small value of "defaultMaxOccupationInMemory", that you've commented on next of this
  • for "IncrementalIndex implementation" part, I don't think it's critically different between current implementations on memory usage and also we are only using on-heap one for realtime indexing. But ideally it's the best way things to be done, I think.

for (DimensionDesc dimensionDesc : dimensionDescs.values()) {
occupation += dimensionDesc.getValues().estimatedSize();
}
return occupation;
}

private long getMinTimeMillis()
{
if (sortFacts) {
Expand Down Expand Up @@ -1050,6 +1067,36 @@ public ColumnCapabilitiesImpl getCapabilities()
}
}

static interface SizeEstimator<T> {

int estimate(T object);

SizeEstimator<String> STRING = new SizeEstimator<String>()
{
@Override
public int estimate(String object)
{
return object == null ? 0 : StringUtils.estimatedBinaryLengthAsUTF8(object);
}
};
SizeEstimator<Float> FLOAT = new SizeEstimator<Float>()
{
@Override
public int estimate(Float object)
{
return object == null ? 0 : Floats.BYTES;
}
};
SizeEstimator<Long> LONG = new SizeEstimator<Long>()
{
@Override
public int estimate(Long object)
{
return object == null ? 0 : Longs.BYTES;
}
};
}

static interface DimDim<T extends Comparable<? super T>>
{
public int getId(T value);
Expand All @@ -1064,6 +1111,8 @@ static interface DimDim<T extends Comparable<? super T>>

public T getMaxValue();

public int estimatedSize();

public int add(T value);

public SortedDimLookup sort();
Expand Down Expand Up @@ -1128,6 +1177,12 @@ public String getMaxValue()
return Strings.nullToEmpty(delegate.getMaxValue());
}

@Override
public int estimatedSize()
{
return delegate.estimatedSize();
}

@Override
public int add(String value)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ public ConcurrentMap<TimeAndDims, Integer> getFacts()
}

@Override
protected DimDim makeDimDim(String dimension, Object lock)
protected DimDim makeDimDim(String dimension, SizeEstimator estimator, Object lock)
{
return new OnheapIncrementalIndex.OnHeapDimDim(lock);
return new OnheapIncrementalIndex.OnHeapDimDim(estimator, lock);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -132,9 +133,9 @@ public ConcurrentMap<TimeAndDims, Integer> getFacts()
}

@Override
protected DimDim makeDimDim(String dimension, Object lock)
protected DimDim makeDimDim(String dimension, SizeEstimator estimator, Object lock)
{
return new OnHeapDimDim(lock);
return new OnHeapDimDim(estimator, lock);
}

@Override
Expand Down Expand Up @@ -326,12 +327,17 @@ static class OnHeapDimDim<T extends Comparable<? super T>> implements DimDim<T>
private T minValue = null;
private T maxValue = null;

private int estimatedSize;

private final List<T> idToValue = Lists.newArrayList();
private final Object lock;

public OnHeapDimDim(Object lock)
private final SizeEstimator<T> estimator;

public OnHeapDimDim(SizeEstimator<T> estimator, Object lock)
{
this.lock = lock;
this.estimator = estimator;
}

public int getId(T value)
Expand Down Expand Up @@ -368,13 +374,15 @@ public int add(T value)
synchronized (lock) {
Integer prev = valueToId.get(value);
if (prev != null) {
estimatedSize += Ints.BYTES;
return prev;
}
final int index = size();
valueToId.put(value, index);
idToValue.add(value);
minValue = minValue == null || minValue.compareTo(value) > 0 ? value : minValue;
maxValue = maxValue == null || maxValue.compareTo(value) < 0 ? value : maxValue;
estimatedSize += estimator.estimate(value) + Ints.BYTES;
return index;
}
}
Expand All @@ -391,6 +399,12 @@ public T getMaxValue()
return maxValue;
}

@Override
public int estimatedSize()
{
return estimatedSize;
}

public OnHeapDimLookup sort()
{
synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
{
private static final int defaultMaxRowsInMemory = 75000;
private static final int defaultMaxOccupationInMemory = 64 << 20;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this default? It seems small, perhaps default it to 1/4 of the heap size instead.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought "estimatedOccupation" is basically underestimated value. It's hard to estimate memory usage in java inherently and can different from environments like version, os, etc. "If it cannot be exact, let it have some conservative value by default and make it configurable by users", that I though.

private static final Period defaultIntermediatePersistPeriod = new Period("PT10M");
private static final Period defaultWindowPeriod = new Period("PT10M");
private static final VersioningPolicy defaultVersioningPolicy = new IntervalStartVersioningPolicy();
Expand All @@ -61,6 +62,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis
{
return new RealtimeTuningConfig(
defaultMaxRowsInMemory,
defaultMaxOccupationInMemory,
defaultIntermediatePersistPeriod,
defaultWindowPeriod,
basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory,
Expand All @@ -78,6 +80,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis
}

private final int maxRowsInMemory;
private final int maxOccupationInMemory;
private final Period intermediatePersistPeriod;
private final Period windowPeriod;
private final File basePersistDirectory;
Expand All @@ -95,6 +98,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis
@JsonCreator
public RealtimeTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxOccupationInMemory") Integer maxOccupationInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
Expand All @@ -111,6 +115,7 @@ public RealtimeTuningConfig(
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
this.maxOccupationInMemory = maxOccupationInMemory == null ? defaultMaxOccupationInMemory : maxOccupationInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaultIntermediatePersistPeriod
: intermediatePersistPeriod;
Expand All @@ -134,6 +139,7 @@ public RealtimeTuningConfig(
? defaultHandoffConditionTimeout
: handoffConditionTimeout;
Preconditions.checkArgument(this.handoffConditionTimeout >= 0, "handoffConditionTimeout must be >= 0");
Preconditions.checkArgument(this.maxRowsInMemory > 0, "maxRowsInMemory must be > 0");
}

@JsonProperty
Expand All @@ -142,6 +148,12 @@ public int getMaxRowsInMemory()
return maxRowsInMemory;
}

@JsonProperty
public int getMaxOccupationInMemory()
{
return maxOccupationInMemory;
}

@JsonProperty
public Period getIntermediatePersistPeriod()
{
Expand Down Expand Up @@ -224,6 +236,7 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
maxRowsInMemory,
maxOccupationInMemory,
intermediatePersistPeriod,
windowPeriod,
basePersistDirectory,
Expand All @@ -244,6 +257,7 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir)
{
return new RealtimeTuningConfig(
maxRowsInMemory,
maxOccupationInMemory,
intermediatePersistPeriod,
windowPeriod,
dir,
Expand Down
Loading