From f83f29c79539d52109c3acc183ba3e3bc1e1faa5 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 25 Jul 2016 13:57:35 -0700 Subject: [PATCH] Be more respectful of maxRowsInMemory. - Appenderator: Respect maxRowsInMemory across all sinks. - KafkaIndexTask: Respect maxRowsInMemory across all partitions. --- .../druid/indexing/kafka/KafkaIndexTask.java | 5 +- .../indexing/kafka/KafkaTuningConfig.java | 15 ++++ .../realtime/appenderator/Appenderator.java | 18 ++--- .../appenderator/AppenderatorImpl.java | 72 +++++++++++++------ .../druid/segment/realtime/plumber/Sink.java | 12 ++++ .../appenderator/AppenderatorTest.java | 50 +++++++++++++ 6 files changed, 140 insertions(+), 32 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 4eaca8ea6ecd..3639bebd62a5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -753,9 +753,12 @@ private boolean isPaused() private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) { + final int maxRowsInMemoryPerPartition = (tuningConfig.getMaxRowsInMemory() / + ioConfig.getStartPartitions().getPartitionOffsetMap().size()); return Appenderators.createRealtime( dataSchema, - tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")), + tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) + .withMaxRowsInMemory(maxRowsInMemoryPerPartition), metrics, toolbox.getSegmentPusher(), toolbox.getObjectMapper(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index 374b2dec909c..8a60822fa0f7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -144,4 +144,19 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) handoffConditionTimeout ); } + + public KafkaTuningConfig withMaxRowsInMemory(int rows) + { + return new KafkaTuningConfig( + rows, + maxRowsPerSegment, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + buildV9Directly, + reportParseExceptions, + handoffConditionTimeout + ); + } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index 7d8e3d60fa2f..be305c5c13ea 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -33,7 +33,7 @@ * An Appenderator indexes data. It has some in-memory data and some persisted-on-disk data. It can serve queries on * both of those. It can also push data to deep storage. But, it does not decide which segments data should go into. * It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself! - *

+ *

* Any time you call one of the methods that adds, persists, or pushes data, you must provide a Committer, or a * Supplier of one, that represents all data you have given to the Appenderator so far. The Committer will be used when * that data has been persisted to disk. @@ -54,13 +54,13 @@ public interface Appenderator extends QuerySegmentWalker, Closeable /** * Add a row. Must not be called concurrently from multiple threads. - *

+ *

* If no pending segment exists for the provided identifier, a new one will be created. - *

+ *

* This method may trigger a {@link #persistAll(Committer)} using the supplied Committer. If it does this, the * Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used * asynchronously. - *

+ *

* The add, clear, persistAll, and push methods should all be called from the same thread. * * @param identifier the segment into which this row should be added @@ -95,6 +95,8 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe * Drop all in-memory and on-disk data, and forget any previously-remembered commit metadata. This could be useful if, * for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been * cleared. This may take some time, since all pending persists must finish first. + * + * The add, clear, persistAll, and push methods should all be called from the same thread. */ void clear() throws InterruptedException; @@ -102,7 +104,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe * Drop all data associated with a particular pending segment. Unlike {@link #clear()}), any on-disk commit * metadata will remain unchanged. If there is no pending segment with this identifier, then this method will * do nothing. - *

+ *

* You should not write to the dropped segment after calling "drop". If you need to drop all your data and * re-write it, consider {@link #clear()} instead. * @@ -117,7 +119,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe * machine's local disk. The Committer will be made synchronously will the call to persistAll, but will actually * be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to * disk. - *

+ *

* The add, clear, persistAll, and push methods should all be called from the same thread. * * @param committer a committer associated with all data that has been added so far @@ -129,9 +131,9 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe /** * Merge and push particular segments to deep storage. This will trigger an implicit {@link #persistAll(Committer)} * using the provided Committer. - *

+ *

* After this method is called, you cannot add new data to any segments that were previously under construction. - *

+ *

* The add, clear, persistAll, and push methods should all be called from the same thread. * * @param identifiers list of segments to push diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 9d013ea7c507..3ab523e266c6 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -18,6 +18,7 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -85,6 +86,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -108,10 +110,11 @@ public class AppenderatorImpl implements Appenderator private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline<>( String.CASE_INSENSITIVE_ORDER ); + private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger(); private final QuerySegmentWalker texasRanger; private volatile ListeningExecutorService persistExecutor = null; - private volatile ListeningExecutorService mergeExecutor = null; + private volatile ListeningExecutorService pushExecutor = null; private volatile long nextFlush; private volatile FileLock basePersistDirLock = null; private volatile FileChannel basePersistDirLockChannel = null; @@ -188,26 +191,34 @@ public int add( } final Sink sink = getOrCreateSink(identifier); - int sinkRetVal; + final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory(); + final int sinkRowsInMemoryAfterAdd; try { - sinkRetVal = sink.add(row); + sinkRowsInMemoryAfterAdd = sink.add(row); } catch (IndexSizeExceededException e) { - // Try one more time after swapping, then throw the exception out if it happens again. - persistAll(committerSupplier.get()); - sinkRetVal = sink.add(row); + // Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we + // can't add the row (it just failed). This should never actually happen, though, because we check + // sink.canAddRow after returning from add. + log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier); + throw e; } - if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) { - persistAll(committerSupplier.get()); + if (sinkRowsInMemoryAfterAdd < 0) { + throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier); } - if (sinkRetVal < 0) { - throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier); - } else { - return sink.getNumRows(); + rowsCurrentlyInMemory.addAndGet(sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd); + + if (!sink.canAppendRow() + || System.currentTimeMillis() > nextFlush + || rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { + // persistAll clears rowsCurrentlyInMemory, no need to update it. + persistAll(committerSupplier.get()); } + + return sink.getNumRows(); } @Override @@ -228,6 +239,12 @@ public int getRowCount(final SegmentIdentifier identifier) } } + @VisibleForTesting + int getRowsInMemory() + { + return rowsCurrentlyInMemory.get(); + } + private Sink getOrCreateSink(final SegmentIdentifier identifier) { Sink retVal = sinks.get(identifier); @@ -410,6 +427,9 @@ public String apply(Map.Entry entry) runExecStopwatch.stop(); resetNextFlush(); + // NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes. + rowsCurrentlyInMemory.set(0); + return future; } @@ -455,7 +475,7 @@ public SegmentsAndMetadata apply(Object commitMetadata) return new SegmentsAndMetadata(dataSegments, commitMetadata); } }, - mergeExecutor + pushExecutor ); } @@ -464,9 +484,9 @@ public SegmentsAndMetadata apply(Object commitMetadata) * This is useful if we're going to do something that would otherwise potentially break currently in-progress * pushes. */ - private ListenableFuture mergeBarrier() + private ListenableFuture pushBarrier() { - return mergeExecutor.submit( + return pushExecutor.submit( new Runnable() { @Override @@ -480,7 +500,7 @@ public void run() /** * Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only - * be run in the single-threaded mergeExecutor. + * be run in the single-threaded pushExecutor. * * @param identifier sink identifier * @param sink sink to push @@ -589,7 +609,7 @@ public void close() try { shutdownExecutors(); Preconditions.checkState(persistExecutor.awaitTermination(365, TimeUnit.DAYS), "persistExecutor not terminated"); - Preconditions.checkState(mergeExecutor.awaitTermination(365, TimeUnit.DAYS), "mergeExecutor not terminated"); + Preconditions.checkState(pushExecutor.awaitTermination(365, TimeUnit.DAYS), "pushExecutor not terminated"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -647,9 +667,9 @@ private void initializeExecutors() ) ); } - if (mergeExecutor == null) { + if (pushExecutor == null) { // use a blocking single threaded executor to throttle the firehose when write to disk is slow - mergeExecutor = MoreExecutors.listeningDecorator( + pushExecutor = MoreExecutors.listeningDecorator( Execs.newBlockingSingleThreaded( "appenderator_merge_%d", 1 ) @@ -660,7 +680,7 @@ private void initializeExecutors() private void shutdownExecutors() { persistExecutor.shutdownNow(); - mergeExecutor.shutdownNow(); + pushExecutor.shutdownNow(); } private void resetNextFlush() @@ -830,12 +850,18 @@ private ListenableFuture abandonSegment( final boolean removeOnDiskData ) { - // Mark this identifier as dropping, so no future merge tasks will pick it up. + // Ensure no future writes will be made to this sink. + sink.finishWriting(); + + // Mark this identifier as dropping, so no future push tasks will pick it up. droppingSinks.add(identifier); - // Wait for any outstanding merges to finish, then abandon the segment inside the persist thread. + // Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks). + rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()); + + // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread. return Futures.transform( - mergeBarrier(), + pushBarrier(), new Function() { @Nullable diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 91326192fdd9..29e97dbd5464 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -228,6 +228,18 @@ public int getNumRows() } } + public int getNumRowsInMemory() + { + synchronized (hydrantLock) { + IncrementalIndex index = currHydrant.getIndex(); + if (index == null) { + return 0; + } + + return currHydrant.getIndex().size(); + } + } + private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) { final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java index 12e9e364dc42..620abdc84088 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java @@ -135,6 +135,56 @@ public SegmentIdentifier apply(DataSegment input) } } + @Test + public void testMaxRowsInMemory() throws Exception + { + try (final AppenderatorTester tester = new AppenderatorTester(3)) { + final Appenderator appenderator = tester.getAppenderator(); + final AtomicInteger eventCount = new AtomicInteger(0); + final Supplier committerSupplier = new Supplier() + { + @Override + public Committer get() + { + final Object metadata = ImmutableMap.of("eventCount", eventCount.get()); + + return new Committer() + { + @Override + public Object getMetadata() + { + return metadata; + } + + @Override + public void run() + { + // Do nothing + } + }; + } + }; + + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.startJob(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier); + Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier); + Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier); + Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "baz", 1), committerSupplier); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "qux", 1), committerSupplier); + Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier); + Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.close(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + } + } + @Test public void testRestoreFromDisk() throws Exception {