From b98ed370e5880a1fce8f27640029f7cd9cfd044c Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 12 Feb 2016 21:04:08 +0900 Subject: [PATCH 1/6] flush oldest sink on when excceding maxRowsInMemory --- .../common/index/YeOldePlumberSchool.java | 6 +- .../segment/ReferenceCountingSegment.java | 9 ++ .../druid/segment/realtime/FireHydrant.java | 24 +++- .../appenderator/AppenderatorImpl.java | 13 +- .../realtime/plumber/RealtimePlumber.java | 39 +++++- .../druid/segment/realtime/plumber/Sink.java | 116 ++++++++++-------- 6 files changed, 141 insertions(+), 66 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 02d256c55d6d..893afb611815 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -175,7 +175,7 @@ public void finishJob() try { // User should have persisted everything by now. - Preconditions.checkState(!theSink.swappable(), "All data must be persisted before fininshing the job!"); + Preconditions.checkState(!theSink.swappable(), "All data must be persisted before finishing the job!"); if (spilled.size() == 0) { throw new IllegalStateException("Nothing indexed?"); @@ -225,8 +225,8 @@ public void finishJob() private void spillIfSwappable() { - if (theSink.swappable()) { - final FireHydrant indexToPersist = theSink.swap(); + final FireHydrant indexToPersist = theSink.swap(); + if (indexToPersist != null) { final int rowsToPersist = indexToPersist.getIndex().size(); final File dirToPersist = getSpillDir(indexToPersist.getCount()); diff --git a/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java index 9eabf277f0b7..bd9d5cb8e305 100644 --- a/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java +++ b/processing/src/main/java/io/druid/segment/ReferenceCountingSegment.java @@ -181,4 +181,13 @@ private void innerClose() throws IOException baseSegment.close(); } } + + @Override + public String toString() + { + return "ReferenceCountingSegment{" + + "baseSegment=" + baseSegment.getIdentifier() + + ", numReferences=" + numReferences + + '}'; + } } diff --git a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java index ebc9d0e68faf..ca8a4becfd47 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java @@ -61,14 +61,30 @@ public FireHydrant( this.count = count; } + public int indexSize() { + synchronized (swapLock) { + return index == null ? 0 : index.size(); + } + } + + public boolean canAppendRow() { + synchronized (swapLock) { + return index != null && index.canAppendRow(); + } + } + public IncrementalIndex getIndex() { - return index; + synchronized (swapLock) { + return index; + } } public Segment getSegment() { - return adapter; + synchronized (swapLock) { + return adapter; + } } public int getCount() @@ -78,7 +94,9 @@ public int getCount() public boolean hasSwapped() { - return index == null; + synchronized (swapLock) { + return index == null; + } } public void swapSegment(Segment newAdapter) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 9d013ea7c507..ab61db709dd8 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -346,8 +346,9 @@ public ListenableFuture persistAll(final Committer committer) } } - if (sink.swappable()) { - indexesToPersist.add(Pair.of(sink.swap(), identifier)); + FireHydrant swap = sink.swap(); + if (swap != null) { + indexesToPersist.add(Pair.of(swap, identifier)); } } @@ -503,10 +504,6 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink // Sanity checks for (FireHydrant hydrant : sink) { - if (sink.isWritable()) { - throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", identifier); - } - synchronized (hydrant) { if (!hydrant.hasSwapped()) { throw new ISE("WTF?! Expected sink to be fully persisted before mergeAndPush. Segment[%s].", identifier); @@ -514,6 +511,10 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink } } + if (sink.isWritable()) { + throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", identifier); + } + try { if (descriptorFile.exists()) { // Already pushed. diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 81b13145b2dd..d721f2111bc8 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -93,6 +93,7 @@ public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); private static final int WARN_DELAY = 1000; + private static final int CHECK_INTERNAL_COUNT = 100000; private final DataSchema schema; private final RealtimeTuningConfig config; @@ -124,6 +125,8 @@ public class RealtimePlumber implements Plumber private static final String COMMIT_METADATA_KEY = "%commitMetadata%"; private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%"; + private int counter; + public RealtimePlumber( DataSchema schema, RealtimeTuningConfig config, @@ -215,6 +218,16 @@ public int add(InputRow row, Supplier committerSupplier) throws Index persist(committerSupplier.get()); } + if (++counter % CHECK_INTERNAL_COUNT == 0) { + int size = 0; + for (Sink aSink : sinks.values()) { + size += aSink.sizeInMemory(); + } + if (size > config.getMaxRowsInMemory()) { + persistOldest(committerSupplier.get()); + } + } + return numRows; } @@ -263,12 +276,36 @@ public QueryRunner getQueryRunner(final Query query) public void persist(final Committer committer) { final List> indexesToPersist = Lists.newArrayList(); + for (Sink sink : sinks.values()) { + FireHydrant hydrant = sink.swap(); + if (hydrant != null) { + indexesToPersist.add(Pair.of(hydrant, sink.getInterval())); + } + } + persist(committer, indexesToPersist); + } + + public void persistOldest(final Committer committer) + { + Sink oldestSink = null; + long oldestAccessTime = -1; for (Sink sink : sinks.values()) { if (sink.swappable()) { - indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval())); + long lastAccessTime = sink.getLastAccessTime(); + if (oldestAccessTime < 0 || lastAccessTime < oldestAccessTime) { + oldestAccessTime = lastAccessTime; + oldestSink = sink; + } } } + FireHydrant hydrant = oldestSink == null ? null : oldestSink.swap(); + if (hydrant != null) { + persist(committer, Arrays.asList(Pair.of(hydrant, oldestSink.getInterval()))); + } + } + private void persist(final Committer committer, final List> indexesToPersist) + { log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); final Stopwatch runExecStopwatch = Stopwatch.createStarted(); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 0011df0f4421..debd7306968f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -25,7 +25,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.metamx.common.IAE; import com.metamx.common.ISE; import io.druid.data.input.InputRow; import io.druid.query.aggregation.AggregatorFactory; @@ -41,6 +40,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashSet; @@ -62,8 +62,14 @@ public class Sink implements Iterable private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList(); private final LinkedHashSet dimOrder = Sets.newLinkedHashSet(); private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger(); - private volatile FireHydrant currHydrant; - private volatile boolean writable = true; + + @GuardedBy("hydrantLock") + private boolean writable = true; + + @GuardedBy("hydrantLock") + private FireHydrant currHydrant; // cannot be null after object init + + private long lastAccessTime = 0; public Sink( Interval interval, @@ -127,14 +133,19 @@ public Interval getInterval() public FireHydrant getCurrHydrant() { - return currHydrant; + synchronized (hydrantLock) { + return currHydrant; + } + } + + public long getLastAccessTime() + { + return lastAccessTime; } public int add(InputRow row) throws IndexSizeExceededException { - if (currHydrant == null) { - throw new IAE("No currHydrant but given row[%s]", row); - } + lastAccessTime = System.currentTimeMillis(); synchronized (hydrantLock) { if (!writable) { @@ -152,26 +163,35 @@ public int add(InputRow row) throws IndexSizeExceededException public boolean canAppendRow() { synchronized (hydrantLock) { - return writable && currHydrant != null && currHydrant.getIndex().canAppendRow(); + return writable && currHydrant.canAppendRow(); } } - public boolean isEmpty() + public int sizeInMemory() { synchronized (hydrantLock) { - return hydrants.size() == 1 && currHydrant.getIndex().isEmpty(); + return currHydrant.indexSize(); } } public boolean isWritable() { - return writable; + synchronized (hydrantLock) { + return writable; + } + } + + public void finishWriting() + { + synchronized (hydrantLock) { + writable = false; + } } /** * If currHydrant is A, creates a new index B, sets currHydrant to B and returns A. * - * @return the current index after swapping in a new one + * @return the current index after swapping in a new one. can be null */ public FireHydrant swap() { @@ -181,19 +201,7 @@ public FireHydrant swap() public boolean swappable() { synchronized (hydrantLock) { - return writable && currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0; - } - } - - public boolean finished() - { - return !writable; - } - - public void finishWriting() - { - synchronized (hydrantLock) { - writable = false; + return writable && currHydrant.indexSize() > 0; } } @@ -224,7 +232,7 @@ public String apply(@Nullable AggregatorFactory input) public int getNumRows() { synchronized (hydrantLock) { - return numRowsExcludingCurrIndex.get() + currHydrant.getIndex().size(); + return numRowsExcludingCurrIndex.get() + currHydrant.indexSize(); } } @@ -237,40 +245,42 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) .withDimensionsSpec(schema.getParser()) .withMetrics(schema.getAggregators()) .build(); - final IncrementalIndex newIndex = new OnheapIncrementalIndex(indexSchema, reportParseExceptions, maxRowsInMemory); final FireHydrant old; synchronized (hydrantLock) { - if (writable) { - old = currHydrant; - int newCount = 0; - int numHydrants = hydrants.size(); - if (numHydrants > 0) { - FireHydrant lastHydrant = hydrants.get(numHydrants - 1); - newCount = lastHydrant.getCount() + 1; - if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) { - if (lastHydrant.hasSwapped()) { - QueryableIndex oldIndex = lastHydrant.getSegment().asQueryableIndex(); - for (String dim : oldIndex.getAvailableDimensions()) { - dimOrder.add(dim); - } - } else { - IncrementalIndex oldIndex = lastHydrant.getIndex(); - dimOrder.addAll(oldIndex.getDimensionOrder()); + if (!writable) { + // Oops, someone called finishWriting while we were making this new index. + throw new ISE("finishWriting() called during swap"); + } + + old = currHydrant; + if (old != null && old.indexSize() == 0) { + return null; + } + + final IncrementalIndex newIndex = new OnheapIncrementalIndex( + indexSchema, reportParseExceptions, maxRowsInMemory + ); + + int newCount = 0; + if (old != null) { + newCount = old.getCount() + 1; + if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) { + IncrementalIndex oldIndex = old.getIndex(); + if (oldIndex == null) { + QueryableIndex storedIndex = old.getSegment().asQueryableIndex(); + for (String dim : storedIndex.getAvailableDimensions()) { + dimOrder.add(dim); } - newIndex.loadDimensionIterable(dimOrder); + } else { + dimOrder.addAll(oldIndex.getDimensionOrder()); } + newIndex.loadDimensionIterable(dimOrder); } - currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier()); - if (old != null) { - numRowsExcludingCurrIndex.addAndGet(old.getIndex().size()); - } - hydrants.add(currHydrant); - } else { - // Oops, someone called finishWriting while we were making this new index. - newIndex.close(); - throw new ISE("finishWriting() called during swap"); + numRowsExcludingCurrIndex.addAndGet(old.indexSize()); } + currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier()); + hydrants.add(currHydrant); } return old; From 7a0f37dd2316ef931bef28d52132ab01a11695d9 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Thu, 18 Feb 2016 08:54:02 +0900 Subject: [PATCH 2/6] addressed comments --- .../src/main/java/io/druid/segment/realtime/FireHydrant.java | 4 ++-- .../io/druid/segment/realtime/plumber/RealtimePlumber.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java index ca8a4becfd47..798d2655d124 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java @@ -37,8 +37,8 @@ public class FireHydrant private final int count; private final Object swapLock = new Object(); - private volatile IncrementalIndex index; - private volatile ReferenceCountingSegment adapter; + private IncrementalIndex index; + private ReferenceCountingSegment adapter; public FireHydrant( IncrementalIndex index, diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index d721f2111bc8..436cb912824f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -93,7 +93,7 @@ public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); private static final int WARN_DELAY = 1000; - private static final int CHECK_INTERNAL_COUNT = 100000; + private static final int MAX_ROW_EXCEED_CHECK_COUNT = 100000; private final DataSchema schema; private final RealtimeTuningConfig config; @@ -218,7 +218,7 @@ public int add(InputRow row, Supplier committerSupplier) throws Index persist(committerSupplier.get()); } - if (++counter % CHECK_INTERNAL_COUNT == 0) { + if (++counter % MAX_ROW_EXCEED_CHECK_COUNT == 0) { int size = 0; for (Sink aSink : sinks.values()) { size += aSink.sizeInMemory(); From afe54447e1a00b78416cac963aa434ffc673d302 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 11 Mar 2016 13:37:04 +0900 Subject: [PATCH 3/6] addressed comments --- .../io/druid/segment/indexing/RealtimeTuningConfig.java | 1 + .../druid/segment/realtime/plumber/RealtimePlumber.java | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 14dc723e9add..439c786257fa 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -134,6 +134,7 @@ public RealtimeTuningConfig( ? defaultHandoffConditionTimeout : handoffConditionTimeout; Preconditions.checkArgument(this.handoffConditionTimeout >= 0, "handoffConditionTimeout must be >= 0"); + Preconditions.checkArgument(this.maxRowsInMemory > 0, "maxRowsInMemory must be > 0"); } @JsonProperty diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 436cb912824f..b20c82b1af77 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -112,6 +112,8 @@ public class RealtimePlumber implements Plumber private final Cache cache; + private final int maxRowExceedCheckCount; + private volatile long nextFlush = 0; private volatile boolean shuttingDown = false; private volatile boolean stopped = false; @@ -166,6 +168,11 @@ public RealtimePlumber( cache, cacheConfig ); + this.maxRowExceedCheckCount = Math.min(config.getMaxRowsInMemory() >> 2, MAX_ROW_EXCEED_CHECK_COUNT); + + if (!cache.isLocal()) { + log.error("Configured cache is not local, caching will not be enabled"); + } log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); } @@ -218,7 +225,7 @@ public int add(InputRow row, Supplier committerSupplier) throws Index persist(committerSupplier.get()); } - if (++counter % MAX_ROW_EXCEED_CHECK_COUNT == 0) { + if (maxRowExceedCheckCount > 0 && ++counter % maxRowExceedCheckCount == 0) { int size = 0; for (Sink aSink : sinks.values()) { size += aSink.sizeInMemory(); From 6c87e6c462fc7916e614f209f367a47069931f9e Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 15 Mar 2016 13:59:43 +0900 Subject: [PATCH 4/6] fix test fail --- .../appenderator/AppenderatorImpl.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index ab61db709dd8..6f26ba99d8a5 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -331,9 +331,9 @@ public ListenableFuture persistAll(final Committer committer) final Map commitHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); - final Set identifiers = sinks.keySet(); - for (SegmentIdentifier identifier : identifiers) { - final Sink sink = sinks.get(identifier); + for (Map.Entry entry : sinks.entrySet()) { + final Sink sink = entry.getValue(); + final SegmentIdentifier identifier = entry.getKey(); final List hydrants = Lists.newArrayList(sink); commitHydrants.put(identifier, hydrants.size()); @@ -346,9 +346,8 @@ public ListenableFuture persistAll(final Committer committer) } } - FireHydrant swap = sink.swap(); - if (swap != null) { - indexesToPersist.add(Pair.of(swap, identifier)); + if (sink.swappable()) { + indexesToPersist.add(Pair.of(sink.swap(), identifier)); } } @@ -497,6 +496,10 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink return null; } + if (sink.isWritable()) { + throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", identifier); + } + // Use a descriptor file to indicate that pushing has completed. final File persistDir = computePersistDir(identifier); final File mergedTarget = new File(persistDir, "merged"); @@ -511,10 +514,6 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink } } - if (sink.isWritable()) { - throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", identifier); - } - try { if (descriptorFile.exists()) { // Already pushed. From 07cc583734939545d4123be81f6cbcb4e7b07a45 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 16 Mar 2016 11:00:58 +0900 Subject: [PATCH 5/6] flush based on memory occupation --- .../druid/indexing/common/task/IndexTask.java | 1 + .../common/task/RealtimeIndexTaskTest.java | 1 + .../indexing/common/task/TaskSerdeTest.java | 1 + .../indexing/overlord/TaskLifecycleTest.java | 1 + .../druid/query/metadata/SegmentAnalyzer.java | 3 +- .../segment/incremental/IncrementalIndex.java | 62 +++++++++++++++++-- .../incremental/OffheapIncrementalIndex.java | 4 +- .../incremental/OnheapIncrementalIndex.java | 18 +++++- .../indexing/RealtimeTuningConfig.java | 13 ++++ .../druid/segment/realtime/FireHydrant.java | 8 ++- .../realtime/plumber/RealtimePlumber.java | 25 ++++++-- .../druid/segment/realtime/plumber/Sink.java | 19 ++++-- .../segment/realtime/RealtimeManagerTest.java | 3 + .../appenderator/AppenderatorPlumberTest.java | 1 + .../appenderator/AppenderatorTester.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 1 + .../segment/realtime/plumber/SinkTest.java | 1 + .../cli/validate/DruidJsonValidatorTest.java | 1 + 18 files changed, 142 insertions(+), 22 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 4918af9e4025..556b8b02caaa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -138,6 +138,7 @@ static RealtimeTuningConfig convertTuningConfig( null, null, null, + null, shardSpec, indexSpec, buildV9Directly, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index f71327daaeea..758689306d23 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -898,6 +898,7 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, + null, new Period("P1Y"), new Period("PT10M"), null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 86b584ca8c5e..aeab2c96b104 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -325,6 +325,7 @@ public Plumber findPlumber( new RealtimeTuningConfig( 1, + null, new Period("PT10M"), null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index d85e586d8afe..19349198f833 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -1177,6 +1177,7 @@ private RealtimeIndexTask newRealtimeIndexTask() ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, + null, new Period("P1Y"), null, //default window period of 10 minutes null, // base persist dir ignored by Realtime Index task diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java index 406740636376..37f70bda2111 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java @@ -206,7 +206,8 @@ private ColumnAnalysis analyzeStringColumn( for (int i = 0; i < cardinality; ++i) { String value = bitmapIndex.getValue(i); if (value != null) { - size += StringUtils.estimatedBinaryLengthAsUTF8(value) * bitmapIndex.getBitmap(bitmapIndex.getIndex(value)).size(); + size += StringUtils.estimatedBinaryLengthAsUTF8(value) * + bitmapIndex.getBitmap(bitmapIndex.getIndex(value)).size(); } } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 271107534850..df3cf610af46 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -34,6 +34,7 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.parsers.ParseException; +import io.druid.common.utils.StringUtils; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -356,6 +357,7 @@ public int lookupId(String name) private final List> rowTransformers; private final AggregatorFactory[] metrics; private final AggregatorType[] aggs; + private final int maxLengthForAggregators; private final boolean deserializeComplexMetrics; private final boolean reportParseExceptions; private final boolean sortFacts; @@ -450,19 +452,24 @@ public IncrementalIndex( if (!spatialDimensions.isEmpty()) { this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); } + int length = 0; + for (AggregatorFactory factory : metrics) { + length += factory.getMaxIntermediateSize(); + } + this.maxLengthForAggregators = length; } private DimDim newDimDim(String dimension, ValueType type) { DimDim newDimDim; switch (type) { case LONG: - newDimDim = makeDimDim(dimension, getDimensionDescs()); + newDimDim = makeDimDim(dimension, SizeEstimator.LONG, getDimensionDescs()); break; case FLOAT: - newDimDim = makeDimDim(dimension, getDimensionDescs()); + newDimDim = makeDimDim(dimension, SizeEstimator.FLOAT, getDimensionDescs()); break; case STRING: - newDimDim = new NullValueConverterDimDim(makeDimDim(dimension, getDimensionDescs())); + newDimDim = new NullValueConverterDimDim(makeDimDim(dimension, SizeEstimator.STRING, getDimensionDescs())); break; default: throw new IAE("Invalid column type: " + type); @@ -471,7 +478,7 @@ private DimDim newDimDim(String dimension, ValueType type) { } // use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation - protected abstract DimDim makeDimDim(String dimension, Object lock); + protected abstract DimDim makeDimDim(String dimension, SizeEstimator estimator, Object lock); public abstract ConcurrentMap getFacts(); @@ -692,6 +699,15 @@ public int size() return numEntries.get(); } + public int estimatedOccupation() + { + int occupation = maxLengthForAggregators * getFacts().size(); + for (DimensionDesc dimensionDesc : dimensionDescs.values()) { + occupation += dimensionDesc.getValues().estimatedSize(); + } + return occupation; + } + private long getMinTimeMillis() { if (sortFacts) { @@ -1050,6 +1066,36 @@ public ColumnCapabilitiesImpl getCapabilities() } } + static interface SizeEstimator { + + int estimate(T object); + + SizeEstimator STRING = new SizeEstimator() + { + @Override + public int estimate(String object) + { + return object == null ? 0 : StringUtils.estimatedBinaryLengthAsUTF8(object); + } + }; + SizeEstimator FLOAT = new SizeEstimator() + { + @Override + public int estimate(Float object) + { + return object == null ? 0 : 4; + } + }; + SizeEstimator LONG = new SizeEstimator() + { + @Override + public int estimate(Long object) + { + return object == null ? 0 : 8; + } + }; + } + static interface DimDim> { public int getId(T value); @@ -1064,6 +1110,8 @@ static interface DimDim> public T getMaxValue(); + public int estimatedSize(); + public int add(T value); public SortedDimLookup sort(); @@ -1128,6 +1176,12 @@ public String getMaxValue() return Strings.nullToEmpty(delegate.getMaxValue()); } + @Override + public int estimatedSize() + { + return delegate.estimatedSize(); + } + @Override public int add(String value) { diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 6dbb924a8f89..275108fd4293 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -151,9 +151,9 @@ public ConcurrentMap getFacts() } @Override - protected DimDim makeDimDim(String dimension, Object lock) + protected DimDim makeDimDim(String dimension, SizeEstimator estimator, Object lock) { - return new OnheapIncrementalIndex.OnHeapDimDim(lock); + return new OnheapIncrementalIndex.OnHeapDimDim(estimator, lock); } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 98a6361c9cbb..293c673fc2c9 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -132,9 +132,9 @@ public ConcurrentMap getFacts() } @Override - protected DimDim makeDimDim(String dimension, Object lock) + protected DimDim makeDimDim(String dimension, SizeEstimator estimator, Object lock) { - return new OnHeapDimDim(lock); + return new OnHeapDimDim(estimator, lock); } @Override @@ -326,12 +326,17 @@ static class OnHeapDimDim> implements DimDim private T minValue = null; private T maxValue = null; + private int estimatedSize; + private final List idToValue = Lists.newArrayList(); private final Object lock; - public OnHeapDimDim(Object lock) + private final SizeEstimator estimator; + + public OnHeapDimDim(SizeEstimator estimator, Object lock) { this.lock = lock; + this.estimator = estimator; } public int getId(T value) @@ -375,6 +380,7 @@ public int add(T value) idToValue.add(value); minValue = minValue == null || minValue.compareTo(value) > 0 ? value : minValue; maxValue = maxValue == null || maxValue.compareTo(value) < 0 ? value : maxValue; + estimatedSize += estimator.estimate(value); return index; } } @@ -391,6 +397,12 @@ public T getMaxValue() return maxValue; } + @Override + public int estimatedSize() + { + return estimatedSize; + } + public OnHeapDimLookup sort() { synchronized (lock) { diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 439c786257fa..52f2262b81d4 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -40,6 +40,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig { private static final int defaultMaxRowsInMemory = 75000; + private static final int defaultMaxOccupationInMemory = 64 << 20; private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); private static final Period defaultWindowPeriod = new Period("PT10M"); private static final VersioningPolicy defaultVersioningPolicy = new IntervalStartVersioningPolicy(); @@ -61,6 +62,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis { return new RealtimeTuningConfig( defaultMaxRowsInMemory, + defaultMaxOccupationInMemory, defaultIntermediatePersistPeriod, defaultWindowPeriod, basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, @@ -78,6 +80,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis } private final int maxRowsInMemory; + private final int maxOccupationInMemory; private final Period intermediatePersistPeriod; private final Period windowPeriod; private final File basePersistDirectory; @@ -95,6 +98,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis @JsonCreator public RealtimeTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxOccupationInMemory") Integer maxOccupationInMemory, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -111,6 +115,7 @@ public RealtimeTuningConfig( ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; + this.maxOccupationInMemory = maxOccupationInMemory == null ? defaultMaxOccupationInMemory : maxOccupationInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaultIntermediatePersistPeriod : intermediatePersistPeriod; @@ -143,6 +148,12 @@ public int getMaxRowsInMemory() return maxRowsInMemory; } + @JsonProperty + public int getMaxOccupationInMemory() + { + return maxOccupationInMemory; + } + @JsonProperty public Period getIntermediatePersistPeriod() { @@ -225,6 +236,7 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( maxRowsInMemory, + maxOccupationInMemory, intermediatePersistPeriod, windowPeriod, basePersistDirectory, @@ -245,6 +257,7 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) { return new RealtimeTuningConfig( maxRowsInMemory, + maxOccupationInMemory, intermediatePersistPeriod, windowPeriod, dir, diff --git a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java index 798d2655d124..7c9d145b72f1 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/io/druid/segment/realtime/FireHydrant.java @@ -61,12 +61,18 @@ public FireHydrant( this.count = count; } - public int indexSize() { + public int rowCount() { synchronized (swapLock) { return index == null ? 0 : index.size(); } } + public int estimatedOccupation() { + synchronized (swapLock) { + return index == null ? 0 : index.estimatedOccupation(); + } + } + public boolean canAppendRow() { synchronized (swapLock) { return index != null && index.canAppendRow(); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index b20c82b1af77..d28121efefbb 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -226,11 +226,8 @@ public int add(InputRow row, Supplier committerSupplier) throws Index } if (maxRowExceedCheckCount > 0 && ++counter % maxRowExceedCheckCount == 0) { - int size = 0; - for (Sink aSink : sinks.values()) { - size += aSink.sizeInMemory(); - } - if (size > config.getMaxRowsInMemory()) { + if (rowCountInMemory() > config.getMaxRowsInMemory() || + occupationInMemory() > config.getMaxOccupationInMemory()) { persistOldest(committerSupplier.get()); } } @@ -238,6 +235,24 @@ public int add(InputRow row, Supplier committerSupplier) throws Index return numRows; } + private int rowCountInMemory() + { + int size = 0; + for (Sink aSink : sinks.values()) { + size += aSink.rowCountInMemory(); + } + return size; + } + + private int occupationInMemory() + { + int size = 0; + for (Sink aSink : sinks.values()) { + size += aSink.occupationInMemory(); + } + return size; + } + private Sink getSink(long timestamp) { if (!rejectionPolicy.accept(timestamp)) { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index debd7306968f..770e79eb2507 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -167,10 +167,17 @@ public boolean canAppendRow() } } - public int sizeInMemory() + public int rowCountInMemory() { synchronized (hydrantLock) { - return currHydrant.indexSize(); + return currHydrant.rowCount(); + } + } + + public int occupationInMemory() + { + synchronized (hydrantLock) { + return currHydrant.estimatedOccupation(); } } @@ -201,7 +208,7 @@ public FireHydrant swap() public boolean swappable() { synchronized (hydrantLock) { - return writable && currHydrant.indexSize() > 0; + return writable && currHydrant.rowCount() > 0; } } @@ -232,7 +239,7 @@ public String apply(@Nullable AggregatorFactory input) public int getNumRows() { synchronized (hydrantLock) { - return numRowsExcludingCurrIndex.get() + currHydrant.indexSize(); + return numRowsExcludingCurrIndex.get() + currHydrant.rowCount(); } } @@ -254,7 +261,7 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) } old = currHydrant; - if (old != null && old.indexSize() == 0) { + if (old != null && old.rowCount() == 0) { return null; } @@ -277,7 +284,7 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) } newIndex.loadDimensionIterable(dimOrder); } - numRowsExcludingCurrIndex.addAndGet(old.indexSize()); + numRowsExcludingCurrIndex.addAndGet(old.rowCount()); } currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier()); hydrants.add(currHydrant); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 456185d0e75f..1c82da126438 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -190,6 +190,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException ); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, @@ -245,6 +246,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException tuningConfig_0 = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, @@ -262,6 +264,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException tuningConfig_1 = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 82ff6ba3fa9d..72b90dc09514 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -71,6 +71,7 @@ EasyMock. anyObject(), null, null, null, + null, new IntervalStartVersioningPolicy(), new NoopRejectionPolicyFactory(), null, diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 8723912a52c2..cc1c5224ae86 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -125,6 +125,7 @@ public AppenderatorTester( maxRowsInMemory, null, null, + null, basePersistDirectory, null, null, diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index b604ae713e86..fafbffff23e9 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -195,6 +195,7 @@ public void setUp() throws Exception null, null, null, + null, new IntervalStartVersioningPolicy(), rejectionPolicy, null, diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index a086c4ed849b..2ec789b3ae14 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -59,6 +59,7 @@ public void testSwap() throws Exception final String version = new DateTime().toString(); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( 100, + null, new Period("P1Y"), null, null, diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index 81fe013941cd..12879c64df31 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -169,6 +169,7 @@ public Plumber findPlumber( new RealtimeTuningConfig( 1, + null, new Period("PT10M"), null, null, From 782cb89f50fc6983d520c528c889def9e1d826fb Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 27 Jul 2016 15:55:21 +0900 Subject: [PATCH 6/6] rebased on master & addressed some comments --- .../main/java/io/druid/query/metadata/SegmentAnalyzer.java | 3 +-- .../java/io/druid/segment/incremental/IncrementalIndex.java | 5 +++-- .../druid/segment/incremental/OnheapIncrementalIndex.java | 4 +++- .../io/druid/segment/realtime/plumber/RealtimePlumber.java | 4 ++-- .../main/java/io/druid/segment/realtime/plumber/Sink.java | 6 ++++-- 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java index 37f70bda2111..406740636376 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java @@ -206,8 +206,7 @@ private ColumnAnalysis analyzeStringColumn( for (int i = 0; i < cardinality; ++i) { String value = bitmapIndex.getValue(i); if (value != null) { - size += StringUtils.estimatedBinaryLengthAsUTF8(value) * - bitmapIndex.getBitmap(bitmapIndex.getIndex(value)).size(); + size += StringUtils.estimatedBinaryLengthAsUTF8(value) * bitmapIndex.getBitmap(bitmapIndex.getIndex(value)).size(); } } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index df3cf610af46..99f5cace022a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -29,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.primitives.Floats; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.metamx.common.IAE; @@ -1083,7 +1084,7 @@ public int estimate(String object) @Override public int estimate(Float object) { - return object == null ? 0 : 4; + return object == null ? 0 : Floats.BYTES; } }; SizeEstimator LONG = new SizeEstimator() @@ -1091,7 +1092,7 @@ public int estimate(Float object) @Override public int estimate(Long object) { - return object == null ? 0 : 8; + return object == null ? 0 : Longs.BYTES; } }; } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 293c673fc2c9..dad02b623548 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -22,6 +22,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; @@ -373,6 +374,7 @@ public int add(T value) synchronized (lock) { Integer prev = valueToId.get(value); if (prev != null) { + estimatedSize += Ints.BYTES; return prev; } final int index = size(); @@ -380,7 +382,7 @@ public int add(T value) idToValue.add(value); minValue = minValue == null || minValue.compareTo(value) > 0 ? value : minValue; maxValue = maxValue == null || maxValue.compareTo(value) < 0 ? value : maxValue; - estimatedSize += estimator.estimate(value); + estimatedSize += estimator.estimate(value) + Ints.BYTES; return index; } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index d28121efefbb..3296246c0f38 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -127,7 +127,7 @@ public class RealtimePlumber implements Plumber private static final String COMMIT_METADATA_KEY = "%commitMetadata%"; private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%"; - private int counter; + private int checkCounter; public RealtimePlumber( DataSchema schema, @@ -225,7 +225,7 @@ public int add(InputRow row, Supplier committerSupplier) throws Index persist(committerSupplier.get()); } - if (maxRowExceedCheckCount > 0 && ++counter % maxRowExceedCheckCount == 0) { + if (maxRowExceedCheckCount > 0 && ++checkCounter % maxRowExceedCheckCount == 0) { if (rowCountInMemory() > config.getMaxRowsInMemory() || occupationInMemory() > config.getMaxOccupationInMemory()) { persistOldest(committerSupplier.get()); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 770e79eb2507..061ddaa666e2 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime.plumber; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; @@ -69,7 +70,7 @@ public class Sink implements Iterable @GuardedBy("hydrantLock") private FireHydrant currHydrant; // cannot be null after object init - private long lastAccessTime = 0; + private volatile long lastAccessTime = 0; public Sink( Interval interval, @@ -131,7 +132,8 @@ public Interval getInterval() return interval; } - public FireHydrant getCurrHydrant() + @VisibleForTesting + FireHydrant getCurrHydrant() { synchronized (hydrantLock) { return currHydrant;