diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 4eaca8ea6ecd..3639bebd62a5 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -753,9 +753,12 @@ private boolean isPaused()
private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
{
+ final int maxRowsInMemoryPerPartition = (tuningConfig.getMaxRowsInMemory() /
+ ioConfig.getStartPartitions().getPartitionOffsetMap().size());
return Appenderators.createRealtime(
dataSchema,
- tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")),
+ tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist"))
+ .withMaxRowsInMemory(maxRowsInMemoryPerPartition),
metrics,
toolbox.getSegmentPusher(),
toolbox.getObjectMapper(),
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java
index 374b2dec909c..8a60822fa0f7 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java
@@ -144,4 +144,19 @@ public KafkaTuningConfig withBasePersistDirectory(File dir)
handoffConditionTimeout
);
}
+
+ public KafkaTuningConfig withMaxRowsInMemory(int rows)
+ {
+ return new KafkaTuningConfig(
+ rows,
+ maxRowsPerSegment,
+ intermediatePersistPeriod,
+ basePersistDirectory,
+ maxPendingPersists,
+ indexSpec,
+ buildV9Directly,
+ reportParseExceptions,
+ handoffConditionTimeout
+ );
+ }
}
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java
index 7d8e3d60fa2f..be305c5c13ea 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java
@@ -33,7 +33,7 @@
* An Appenderator indexes data. It has some in-memory data and some persisted-on-disk data. It can serve queries on
* both of those. It can also push data to deep storage. But, it does not decide which segments data should go into.
* It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself!
- *
+ *
* Any time you call one of the methods that adds, persists, or pushes data, you must provide a Committer, or a
* Supplier of one, that represents all data you have given to the Appenderator so far. The Committer will be used when
* that data has been persisted to disk.
@@ -54,13 +54,13 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
/**
* Add a row. Must not be called concurrently from multiple threads.
- *
+ *
* If no pending segment exists for the provided identifier, a new one will be created.
- *
+ *
* This method may trigger a {@link #persistAll(Committer)} using the supplied Committer. If it does this, the
* Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used
* asynchronously.
- *
+ *
* The add, clear, persistAll, and push methods should all be called from the same thread.
*
* @param identifier the segment into which this row should be added
@@ -95,6 +95,8 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe
* Drop all in-memory and on-disk data, and forget any previously-remembered commit metadata. This could be useful if,
* for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been
* cleared. This may take some time, since all pending persists must finish first.
+ *
+ * The add, clear, persistAll, and push methods should all be called from the same thread.
*/
void clear() throws InterruptedException;
@@ -102,7 +104,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe
* Drop all data associated with a particular pending segment. Unlike {@link #clear()}), any on-disk commit
* metadata will remain unchanged. If there is no pending segment with this identifier, then this method will
* do nothing.
- *
+ *
* You should not write to the dropped segment after calling "drop". If you need to drop all your data and
* re-write it, consider {@link #clear()} instead.
*
@@ -117,7 +119,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe
* machine's local disk. The Committer will be made synchronously will the call to persistAll, but will actually
* be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to
* disk.
- *
+ *
* The add, clear, persistAll, and push methods should all be called from the same thread.
*
* @param committer a committer associated with all data that has been added so far
@@ -129,9 +131,9 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe
/**
* Merge and push particular segments to deep storage. This will trigger an implicit {@link #persistAll(Committer)}
* using the provided Committer.
- *
+ *
* After this method is called, you cannot add new data to any segments that were previously under construction.
- *
+ *
* The add, clear, persistAll, and push methods should all be called from the same thread.
*
* @param identifiers list of segments to push
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 9d013ea7c507..3ab523e266c6 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -18,6 +18,7 @@
package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -85,6 +86,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@@ -108,10 +110,11 @@ public class AppenderatorImpl implements Appenderator
private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
);
+ private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
private final QuerySegmentWalker texasRanger;
private volatile ListeningExecutorService persistExecutor = null;
- private volatile ListeningExecutorService mergeExecutor = null;
+ private volatile ListeningExecutorService pushExecutor = null;
private volatile long nextFlush;
private volatile FileLock basePersistDirLock = null;
private volatile FileChannel basePersistDirLockChannel = null;
@@ -188,26 +191,34 @@ public int add(
}
final Sink sink = getOrCreateSink(identifier);
- int sinkRetVal;
+ final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
+ final int sinkRowsInMemoryAfterAdd;
try {
- sinkRetVal = sink.add(row);
+ sinkRowsInMemoryAfterAdd = sink.add(row);
}
catch (IndexSizeExceededException e) {
- // Try one more time after swapping, then throw the exception out if it happens again.
- persistAll(committerSupplier.get());
- sinkRetVal = sink.add(row);
+ // Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we
+ // can't add the row (it just failed). This should never actually happen, though, because we check
+ // sink.canAddRow after returning from add.
+ log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier);
+ throw e;
}
- if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
- persistAll(committerSupplier.get());
+ if (sinkRowsInMemoryAfterAdd < 0) {
+ throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
}
- if (sinkRetVal < 0) {
- throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
- } else {
- return sink.getNumRows();
+ rowsCurrentlyInMemory.addAndGet(sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd);
+
+ if (!sink.canAppendRow()
+ || System.currentTimeMillis() > nextFlush
+ || rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
+ // persistAll clears rowsCurrentlyInMemory, no need to update it.
+ persistAll(committerSupplier.get());
}
+
+ return sink.getNumRows();
}
@Override
@@ -228,6 +239,12 @@ public int getRowCount(final SegmentIdentifier identifier)
}
}
+ @VisibleForTesting
+ int getRowsInMemory()
+ {
+ return rowsCurrentlyInMemory.get();
+ }
+
private Sink getOrCreateSink(final SegmentIdentifier identifier)
{
Sink retVal = sinks.get(identifier);
@@ -410,6 +427,9 @@ public String apply(Map.Entry entry)
runExecStopwatch.stop();
resetNextFlush();
+ // NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes.
+ rowsCurrentlyInMemory.set(0);
+
return future;
}
@@ -455,7 +475,7 @@ public SegmentsAndMetadata apply(Object commitMetadata)
return new SegmentsAndMetadata(dataSegments, commitMetadata);
}
},
- mergeExecutor
+ pushExecutor
);
}
@@ -464,9 +484,9 @@ public SegmentsAndMetadata apply(Object commitMetadata)
* This is useful if we're going to do something that would otherwise potentially break currently in-progress
* pushes.
*/
- private ListenableFuture> mergeBarrier()
+ private ListenableFuture> pushBarrier()
{
- return mergeExecutor.submit(
+ return pushExecutor.submit(
new Runnable()
{
@Override
@@ -480,7 +500,7 @@ public void run()
/**
* Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only
- * be run in the single-threaded mergeExecutor.
+ * be run in the single-threaded pushExecutor.
*
* @param identifier sink identifier
* @param sink sink to push
@@ -589,7 +609,7 @@ public void close()
try {
shutdownExecutors();
Preconditions.checkState(persistExecutor.awaitTermination(365, TimeUnit.DAYS), "persistExecutor not terminated");
- Preconditions.checkState(mergeExecutor.awaitTermination(365, TimeUnit.DAYS), "mergeExecutor not terminated");
+ Preconditions.checkState(pushExecutor.awaitTermination(365, TimeUnit.DAYS), "pushExecutor not terminated");
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -647,9 +667,9 @@ private void initializeExecutors()
)
);
}
- if (mergeExecutor == null) {
+ if (pushExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
- mergeExecutor = MoreExecutors.listeningDecorator(
+ pushExecutor = MoreExecutors.listeningDecorator(
Execs.newBlockingSingleThreaded(
"appenderator_merge_%d", 1
)
@@ -660,7 +680,7 @@ private void initializeExecutors()
private void shutdownExecutors()
{
persistExecutor.shutdownNow();
- mergeExecutor.shutdownNow();
+ pushExecutor.shutdownNow();
}
private void resetNextFlush()
@@ -830,12 +850,18 @@ private ListenableFuture> abandonSegment(
final boolean removeOnDiskData
)
{
- // Mark this identifier as dropping, so no future merge tasks will pick it up.
+ // Ensure no future writes will be made to this sink.
+ sink.finishWriting();
+
+ // Mark this identifier as dropping, so no future push tasks will pick it up.
droppingSinks.add(identifier);
- // Wait for any outstanding merges to finish, then abandon the segment inside the persist thread.
+ // Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks).
+ rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
+
+ // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread.
return Futures.transform(
- mergeBarrier(),
+ pushBarrier(),
new Function