diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 02d256c55d6d..893afb611815 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -175,7 +175,7 @@ public void finishJob() try { // User should have persisted everything by now. - Preconditions.checkState(!theSink.swappable(), "All data must be persisted before fininshing the job!"); + Preconditions.checkState(!theSink.swappable(), "All data must be persisted before finishing the job!"); if (spilled.size() == 0) { throw new IllegalStateException("Nothing indexed?"); @@ -225,8 +225,8 @@ public void finishJob() private void spillIfSwappable() { - if (theSink.swappable()) { - final FireHydrant indexToPersist = theSink.swap(); + final FireHydrant indexToPersist = theSink.swap(); + if (indexToPersist != null) { final int rowsToPersist = indexToPersist.getIndex().size(); final File dirToPersist = getSpillDir(indexToPersist.getCount()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 4918af9e4025..556b8b02caaa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -138,6 +138,7 @@ static RealtimeTuningConfig convertTuningConfig( null, null, null, + null, shardSpec, indexSpec, buildV9Directly, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index f71327daaeea..758689306d23 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -898,6 +898,7 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, + null, new Period("P1Y"), new Period("PT10M"), null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 86b584ca8c5e..aeab2c96b104 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -325,6 +325,7 @@ public Plumber findPlumber( new RealtimeTuningConfig( 1, + null, new Period("PT10M"), null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index d85e586d8afe..19349198f833 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -1177,6 +1177,7 @@ private RealtimeIndexTask newRealtimeIndexTask() ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, + null, new Period("P1Y"), null, //default window period of 10 minutes null, // base persist dir ignored by Realtime Index task diff --git a/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java index 9eabf277f0b7..bd9d5cb8e305 100644 --- a/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java +++ b/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java @@ -181,4 +181,13 @@ private void innerClose() throws IOException baseSegment.close(); } } + + @Override + public String toString() + { + return "ReferenceCountingSegment{" + + "baseSegment=" + baseSegment.getIdentifier() + + ", numReferences=" + numReferences + + '}'; + } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 271107534850..99f5cace022a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -29,11 +29,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.primitives.Floats; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.parsers.ParseException; +import io.druid.common.utils.StringUtils; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -356,6 +358,7 @@ public int lookupId(String name) private final List> rowTransformers; private final AggregatorFactory[] metrics; private final AggregatorType[] aggs; + private final int maxLengthForAggregators; private final boolean deserializeComplexMetrics; private final boolean reportParseExceptions; private final boolean sortFacts; @@ -450,19 +453,24 @@ public IncrementalIndex( if (!spatialDimensions.isEmpty()) { this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); } + int length = 0; + for (AggregatorFactory factory : metrics) { + length += factory.getMaxIntermediateSize(); + } + this.maxLengthForAggregators = length; } private DimDim newDimDim(String dimension, ValueType type) { DimDim newDimDim; switch (type) { case LONG: - newDimDim = makeDimDim(dimension, getDimensionDescs()); + newDimDim = makeDimDim(dimension, SizeEstimator.LONG, getDimensionDescs()); break; case FLOAT: - newDimDim = makeDimDim(dimension, getDimensionDescs()); + newDimDim = makeDimDim(dimension, SizeEstimator.FLOAT, getDimensionDescs()); break; case STRING: - newDimDim = new NullValueConverterDimDim(makeDimDim(dimension, getDimensionDescs())); + newDimDim = new NullValueConverterDimDim(makeDimDim(dimension, SizeEstimator.STRING, getDimensionDescs())); break; default: throw new IAE("Invalid column type: " + type); @@ -471,7 +479,7 @@ private DimDim newDimDim(String dimension, ValueType type) { } // use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation - protected abstract DimDim makeDimDim(String dimension, Object lock); + protected abstract DimDim makeDimDim(String dimension, SizeEstimator estimator, Object lock); public abstract ConcurrentMap getFacts(); @@ -692,6 +700,15 @@ public int size() return numEntries.get(); } + public int estimatedOccupation() + { + int occupation = maxLengthForAggregators * getFacts().size(); + for (DimensionDesc dimensionDesc : dimensionDescs.values()) { + occupation += dimensionDesc.getValues().estimatedSize(); + } + return occupation; + } + private long getMinTimeMillis() { if (sortFacts) { @@ -1050,6 +1067,36 @@ public ColumnCapabilitiesImpl getCapabilities() } } + static interface SizeEstimator { + + int estimate(T object); + + SizeEstimator STRING = new SizeEstimator() + { + @Override + public int estimate(String object) + { + return object == null ? 0 : StringUtils.estimatedBinaryLengthAsUTF8(object); + } + }; + SizeEstimator FLOAT = new SizeEstimator() + { + @Override + public int estimate(Float object) + { + return object == null ? 0 : Floats.BYTES; + } + }; + SizeEstimator LONG = new SizeEstimator() + { + @Override + public int estimate(Long object) + { + return object == null ? 0 : Longs.BYTES; + } + }; + } + static interface DimDim> { public int getId(T value); @@ -1064,6 +1111,8 @@ static interface DimDim> public T getMaxValue(); + public int estimatedSize(); + public int add(T value); public SortedDimLookup sort(); @@ -1128,6 +1177,12 @@ public String getMaxValue() return Strings.nullToEmpty(delegate.getMaxValue()); } + @Override + public int estimatedSize() + { + return delegate.estimatedSize(); + } + @Override public int add(String value) { diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 6dbb924a8f89..275108fd4293 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -151,9 +151,9 @@ public ConcurrentMap getFacts() } @Override - protected DimDim makeDimDim(String dimension, Object lock) + protected DimDim makeDimDim(String dimension, SizeEstimator estimator, Object lock) { - return new OnheapIncrementalIndex.OnHeapDimDim(lock); + return new OnheapIncrementalIndex.OnHeapDimDim(estimator, lock); } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 98a6361c9cbb..dad02b623548 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -22,6 +22,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; @@ -132,9 +133,9 @@ public ConcurrentMap getFacts() } @Override - protected DimDim makeDimDim(String dimension, Object lock) + protected DimDim makeDimDim(String dimension, SizeEstimator estimator, Object lock) { - return new OnHeapDimDim(lock); + return new OnHeapDimDim(estimator, lock); } @Override @@ -326,12 +327,17 @@ static class OnHeapDimDim> implements DimDim private T minValue = null; private T maxValue = null; + private int estimatedSize; + private final List idToValue = Lists.newArrayList(); private final Object lock; - public OnHeapDimDim(Object lock) + private final SizeEstimator estimator; + + public OnHeapDimDim(SizeEstimator estimator, Object lock) { this.lock = lock; + this.estimator = estimator; } public int getId(T value) @@ -368,6 +374,7 @@ public int add(T value) synchronized (lock) { Integer prev = valueToId.get(value); if (prev != null) { + estimatedSize += Ints.BYTES; return prev; } final int index = size(); @@ -375,6 +382,7 @@ public int add(T value) idToValue.add(value); minValue = minValue == null || minValue.compareTo(value) > 0 ? value : minValue; maxValue = maxValue == null || maxValue.compareTo(value) < 0 ? value : maxValue; + estimatedSize += estimator.estimate(value) + Ints.BYTES; return index; } } @@ -391,6 +399,12 @@ public T getMaxValue() return maxValue; } + @Override + public int estimatedSize() + { + return estimatedSize; + } + public OnHeapDimLookup sort() { synchronized (lock) { diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 14dc723e9add..52f2262b81d4 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -40,6 +40,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig { private static final int defaultMaxRowsInMemory = 75000; + private static final int defaultMaxOccupationInMemory = 64 << 20; private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); private static final Period defaultWindowPeriod = new Period("PT10M"); private static final VersioningPolicy defaultVersioningPolicy = new IntervalStartVersioningPolicy(); @@ -61,6 +62,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis { return new RealtimeTuningConfig( defaultMaxRowsInMemory, + defaultMaxOccupationInMemory, defaultIntermediatePersistPeriod, defaultWindowPeriod, basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, @@ -78,6 +80,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis } private final int maxRowsInMemory; + private final int maxOccupationInMemory; private final Period intermediatePersistPeriod; private final Period windowPeriod; private final File basePersistDirectory; @@ -95,6 +98,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis @JsonCreator public RealtimeTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxOccupationInMemory") Integer maxOccupationInMemory, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -111,6 +115,7 @@ public RealtimeTuningConfig( ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; + this.maxOccupationInMemory = maxOccupationInMemory == null ? defaultMaxOccupationInMemory : maxOccupationInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaultIntermediatePersistPeriod : intermediatePersistPeriod; @@ -134,6 +139,7 @@ public RealtimeTuningConfig( ? defaultHandoffConditionTimeout : handoffConditionTimeout; Preconditions.checkArgument(this.handoffConditionTimeout >= 0, "handoffConditionTimeout must be >= 0"); + Preconditions.checkArgument(this.maxRowsInMemory > 0, "maxRowsInMemory must be > 0"); } @JsonProperty @@ -142,6 +148,12 @@ public int getMaxRowsInMemory() return maxRowsInMemory; } + @JsonProperty + public int getMaxOccupationInMemory() + { + return maxOccupationInMemory; + } + @JsonProperty public Period getIntermediatePersistPeriod() { @@ -224,6 +236,7 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( maxRowsInMemory, + maxOccupationInMemory, intermediatePersistPeriod, windowPeriod, basePersistDirectory, @@ -244,6 +257,7 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) { return new RealtimeTuningConfig( maxRowsInMemory, + maxOccupationInMemory, intermediatePersistPeriod, windowPeriod, dir, diff --git a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java index ebc9d0e68faf..7c9d145b72f1 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java @@ -37,8 +37,8 @@ public class FireHydrant private final int count; private final Object swapLock = new Object(); - private volatile IncrementalIndex index; - private volatile ReferenceCountingSegment adapter; + private IncrementalIndex index; + private ReferenceCountingSegment adapter; public FireHydrant( IncrementalIndex index, @@ -61,14 +61,36 @@ public FireHydrant( this.count = count; } + public int rowCount() { + synchronized (swapLock) { + return index == null ? 0 : index.size(); + } + } + + public int estimatedOccupation() { + synchronized (swapLock) { + return index == null ? 0 : index.estimatedOccupation(); + } + } + + public boolean canAppendRow() { + synchronized (swapLock) { + return index != null && index.canAppendRow(); + } + } + public IncrementalIndex getIndex() { - return index; + synchronized (swapLock) { + return index; + } } public Segment getSegment() { - return adapter; + synchronized (swapLock) { + return adapter; + } } public int getCount() @@ -78,7 +100,9 @@ public int getCount() public boolean hasSwapped() { - return index == null; + synchronized (swapLock) { + return index == null; + } } public void swapSegment(Segment newAdapter) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 9d013ea7c507..6f26ba99d8a5 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -331,9 +331,9 @@ public ListenableFuture persistAll(final Committer committer) final Map commitHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); - final Set identifiers = sinks.keySet(); - for (SegmentIdentifier identifier : identifiers) { - final Sink sink = sinks.get(identifier); + for (Map.Entry entry : sinks.entrySet()) { + final Sink sink = entry.getValue(); + final SegmentIdentifier identifier = entry.getKey(); final List hydrants = Lists.newArrayList(sink); commitHydrants.put(identifier, hydrants.size()); @@ -496,6 +496,10 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink return null; } + if (sink.isWritable()) { + throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", identifier); + } + // Use a descriptor file to indicate that pushing has completed. final File persistDir = computePersistDir(identifier); final File mergedTarget = new File(persistDir, "merged"); @@ -503,10 +507,6 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink // Sanity checks for (FireHydrant hydrant : sink) { - if (sink.isWritable()) { - throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", identifier); - } - synchronized (hydrant) { if (!hydrant.hasSwapped()) { throw new ISE("WTF?! Expected sink to be fully persisted before mergeAndPush. Segment[%s].", identifier); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 81b13145b2dd..3296246c0f38 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -93,6 +93,7 @@ public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); private static final int WARN_DELAY = 1000; + private static final int MAX_ROW_EXCEED_CHECK_COUNT = 100000; private final DataSchema schema; private final RealtimeTuningConfig config; @@ -111,6 +112,8 @@ public class RealtimePlumber implements Plumber private final Cache cache; + private final int maxRowExceedCheckCount; + private volatile long nextFlush = 0; private volatile boolean shuttingDown = false; private volatile boolean stopped = false; @@ -124,6 +127,8 @@ public class RealtimePlumber implements Plumber private static final String COMMIT_METADATA_KEY = "%commitMetadata%"; private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%"; + private int checkCounter; + public RealtimePlumber( DataSchema schema, RealtimeTuningConfig config, @@ -163,6 +168,11 @@ public RealtimePlumber( cache, cacheConfig ); + this.maxRowExceedCheckCount = Math.min(config.getMaxRowsInMemory() >> 2, MAX_ROW_EXCEED_CHECK_COUNT); + + if (!cache.isLocal()) { + log.error("Configured cache is not local, caching will not be enabled"); + } log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); } @@ -215,9 +225,34 @@ public int add(InputRow row, Supplier committerSupplier) throws Index persist(committerSupplier.get()); } + if (maxRowExceedCheckCount > 0 && ++checkCounter % maxRowExceedCheckCount == 0) { + if (rowCountInMemory() > config.getMaxRowsInMemory() || + occupationInMemory() > config.getMaxOccupationInMemory()) { + persistOldest(committerSupplier.get()); + } + } + return numRows; } + private int rowCountInMemory() + { + int size = 0; + for (Sink aSink : sinks.values()) { + size += aSink.rowCountInMemory(); + } + return size; + } + + private int occupationInMemory() + { + int size = 0; + for (Sink aSink : sinks.values()) { + size += aSink.occupationInMemory(); + } + return size; + } + private Sink getSink(long timestamp) { if (!rejectionPolicy.accept(timestamp)) { @@ -263,12 +298,36 @@ public QueryRunner getQueryRunner(final Query query) public void persist(final Committer committer) { final List> indexesToPersist = Lists.newArrayList(); + for (Sink sink : sinks.values()) { + FireHydrant hydrant = sink.swap(); + if (hydrant != null) { + indexesToPersist.add(Pair.of(hydrant, sink.getInterval())); + } + } + persist(committer, indexesToPersist); + } + + public void persistOldest(final Committer committer) + { + Sink oldestSink = null; + long oldestAccessTime = -1; for (Sink sink : sinks.values()) { if (sink.swappable()) { - indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval())); + long lastAccessTime = sink.getLastAccessTime(); + if (oldestAccessTime < 0 || lastAccessTime < oldestAccessTime) { + oldestAccessTime = lastAccessTime; + oldestSink = sink; + } } } + FireHydrant hydrant = oldestSink == null ? null : oldestSink.swap(); + if (hydrant != null) { + persist(committer, Arrays.asList(Pair.of(hydrant, oldestSink.getInterval()))); + } + } + private void persist(final Committer committer, final List> indexesToPersist) + { log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); final Stopwatch runExecStopwatch = Stopwatch.createStarted(); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 0011df0f4421..061ddaa666e2 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -19,13 +19,13 @@ package io.druid.segment.realtime.plumber; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.metamx.common.IAE; import com.metamx.common.ISE; import io.druid.data.input.InputRow; import io.druid.query.aggregation.AggregatorFactory; @@ -41,6 +41,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashSet; @@ -62,8 +63,14 @@ public class Sink implements Iterable private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList(); private final LinkedHashSet dimOrder = Sets.newLinkedHashSet(); private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger(); - private volatile FireHydrant currHydrant; - private volatile boolean writable = true; + + @GuardedBy("hydrantLock") + private boolean writable = true; + + @GuardedBy("hydrantLock") + private FireHydrant currHydrant; // cannot be null after object init + + private volatile long lastAccessTime = 0; public Sink( Interval interval, @@ -125,16 +132,22 @@ public Interval getInterval() return interval; } - public FireHydrant getCurrHydrant() + @VisibleForTesting + FireHydrant getCurrHydrant() + { + synchronized (hydrantLock) { + return currHydrant; + } + } + + public long getLastAccessTime() { - return currHydrant; + return lastAccessTime; } public int add(InputRow row) throws IndexSizeExceededException { - if (currHydrant == null) { - throw new IAE("No currHydrant but given row[%s]", row); - } + lastAccessTime = System.currentTimeMillis(); synchronized (hydrantLock) { if (!writable) { @@ -152,48 +165,52 @@ public int add(InputRow row) throws IndexSizeExceededException public boolean canAppendRow() { synchronized (hydrantLock) { - return writable && currHydrant != null && currHydrant.getIndex().canAppendRow(); + return writable && currHydrant.canAppendRow(); } } - public boolean isEmpty() + public int rowCountInMemory() { synchronized (hydrantLock) { - return hydrants.size() == 1 && currHydrant.getIndex().isEmpty(); + return currHydrant.rowCount(); } } - public boolean isWritable() + public int occupationInMemory() { - return writable; + synchronized (hydrantLock) { + return currHydrant.estimatedOccupation(); + } } - /** - * If currHydrant is A, creates a new index B, sets currHydrant to B and returns A. - * - * @return the current index after swapping in a new one - */ - public FireHydrant swap() + public boolean isWritable() { - return makeNewCurrIndex(interval.getStartMillis(), schema); + synchronized (hydrantLock) { + return writable; + } } - public boolean swappable() + public void finishWriting() { synchronized (hydrantLock) { - return writable && currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0; + writable = false; } } - public boolean finished() + /** + * If currHydrant is A, creates a new index B, sets currHydrant to B and returns A. + * + * @return the current index after swapping in a new one. can be null + */ + public FireHydrant swap() { - return !writable; + return makeNewCurrIndex(interval.getStartMillis(), schema); } - public void finishWriting() + public boolean swappable() { synchronized (hydrantLock) { - writable = false; + return writable && currHydrant.rowCount() > 0; } } @@ -224,7 +241,7 @@ public String apply(@Nullable AggregatorFactory input) public int getNumRows() { synchronized (hydrantLock) { - return numRowsExcludingCurrIndex.get() + currHydrant.getIndex().size(); + return numRowsExcludingCurrIndex.get() + currHydrant.rowCount(); } } @@ -237,40 +254,42 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) .withDimensionsSpec(schema.getParser()) .withMetrics(schema.getAggregators()) .build(); - final IncrementalIndex newIndex = new OnheapIncrementalIndex(indexSchema, reportParseExceptions, maxRowsInMemory); final FireHydrant old; synchronized (hydrantLock) { - if (writable) { - old = currHydrant; - int newCount = 0; - int numHydrants = hydrants.size(); - if (numHydrants > 0) { - FireHydrant lastHydrant = hydrants.get(numHydrants - 1); - newCount = lastHydrant.getCount() + 1; - if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) { - if (lastHydrant.hasSwapped()) { - QueryableIndex oldIndex = lastHydrant.getSegment().asQueryableIndex(); - for (String dim : oldIndex.getAvailableDimensions()) { - dimOrder.add(dim); - } - } else { - IncrementalIndex oldIndex = lastHydrant.getIndex(); - dimOrder.addAll(oldIndex.getDimensionOrder()); + if (!writable) { + // Oops, someone called finishWriting while we were making this new index. + throw new ISE("finishWriting() called during swap"); + } + + old = currHydrant; + if (old != null && old.rowCount() == 0) { + return null; + } + + final IncrementalIndex newIndex = new OnheapIncrementalIndex( + indexSchema, reportParseExceptions, maxRowsInMemory + ); + + int newCount = 0; + if (old != null) { + newCount = old.getCount() + 1; + if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) { + IncrementalIndex oldIndex = old.getIndex(); + if (oldIndex == null) { + QueryableIndex storedIndex = old.getSegment().asQueryableIndex(); + for (String dim : storedIndex.getAvailableDimensions()) { + dimOrder.add(dim); } - newIndex.loadDimensionIterable(dimOrder); + } else { + dimOrder.addAll(oldIndex.getDimensionOrder()); } + newIndex.loadDimensionIterable(dimOrder); } - currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier()); - if (old != null) { - numRowsExcludingCurrIndex.addAndGet(old.getIndex().size()); - } - hydrants.add(currHydrant); - } else { - // Oops, someone called finishWriting while we were making this new index. - newIndex.close(); - throw new ISE("finishWriting() called during swap"); + numRowsExcludingCurrIndex.addAndGet(old.rowCount()); } + currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier()); + hydrants.add(currHydrant); } return old; diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 456185d0e75f..1c82da126438 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -190,6 +190,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException ); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, @@ -245,6 +246,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException tuningConfig_0 = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, @@ -262,6 +264,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException tuningConfig_1 = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 82ff6ba3fa9d..72b90dc09514 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -71,6 +71,7 @@ EasyMock. anyObject(), null, null, null, + null, new IntervalStartVersioningPolicy(), new NoopRejectionPolicyFactory(), null, diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 8723912a52c2..cc1c5224ae86 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -125,6 +125,7 @@ public AppenderatorTester( maxRowsInMemory, null, null, + null, basePersistDirectory, null, null, diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index b604ae713e86..fafbffff23e9 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -195,6 +195,7 @@ public void setUp() throws Exception null, null, null, + null, new IntervalStartVersioningPolicy(), rejectionPolicy, null, diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index a086c4ed849b..2ec789b3ae14 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -59,6 +59,7 @@ public void testSwap() throws Exception final String version = new DateTime().toString(); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( 100, + null, new Period("P1Y"), null, null, diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index 81fe013941cd..12879c64df31 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -169,6 +169,7 @@ public Plumber findPlumber( new RealtimeTuningConfig( 1, + null, new Period("PT10M"), null, null,