Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,12 @@ private boolean isPaused()

private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
{
final int maxRowsInMemoryPerPartition = (tuningConfig.getMaxRowsInMemory() /
ioConfig.getStartPartitions().getPartitionOffsetMap().size());
return Appenderators.createRealtime(
dataSchema,
tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")),
tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist"))
.withMaxRowsInMemory(maxRowsInMemoryPerPartition),
metrics,
toolbox.getSegmentPusher(),
toolbox.getObjectMapper(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,19 @@ public KafkaTuningConfig withBasePersistDirectory(File dir)
handoffConditionTimeout
);
}

public KafkaTuningConfig withMaxRowsInMemory(int rows)
{
return new KafkaTuningConfig(
rows,
maxRowsPerSegment,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
indexSpec,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* An Appenderator indexes data. It has some in-memory data and some persisted-on-disk data. It can serve queries on
* both of those. It can also push data to deep storage. But, it does not decide which segments data should go into.
* It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself!
* <p/>
* <p>
* Any time you call one of the methods that adds, persists, or pushes data, you must provide a Committer, or a
* Supplier of one, that represents all data you have given to the Appenderator so far. The Committer will be used when
* that data has been persisted to disk.
Expand All @@ -54,13 +54,13 @@ public interface Appenderator extends QuerySegmentWalker, Closeable

/**
* Add a row. Must not be called concurrently from multiple threads.
* <p/>
* <p>
* If no pending segment exists for the provided identifier, a new one will be created.
* <p/>
* <p>
* This method may trigger a {@link #persistAll(Committer)} using the supplied Committer. If it does this, the
* Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used
* asynchronously.
* <p/>
* <p>
* The add, clear, persistAll, and push methods should all be called from the same thread.
*
* @param identifier the segment into which this row should be added
Expand Down Expand Up @@ -95,14 +95,16 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier<Committer> committe
* Drop all in-memory and on-disk data, and forget any previously-remembered commit metadata. This could be useful if,
* for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been
* cleared. This may take some time, since all pending persists must finish first.
*
* The add, clear, persistAll, and push methods should all be called from the same thread.
*/
void clear() throws InterruptedException;

/**
* Drop all data associated with a particular pending segment. Unlike {@link #clear()}), any on-disk commit
* metadata will remain unchanged. If there is no pending segment with this identifier, then this method will
* do nothing.
* <p/>
* <p>
* You should not write to the dropped segment after calling "drop". If you need to drop all your data and
* re-write it, consider {@link #clear()} instead.
*
Expand All @@ -117,7 +119,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier<Committer> committe
* machine's local disk. The Committer will be made synchronously will the call to persistAll, but will actually
* be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to
* disk.
* <p/>
* <p>
* The add, clear, persistAll, and push methods should all be called from the same thread.
*
* @param committer a committer associated with all data that has been added so far
Expand All @@ -129,9 +131,9 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier<Committer> committe
/**
* Merge and push particular segments to deep storage. This will trigger an implicit {@link #persistAll(Committer)}
* using the provided Committer.
* <p/>
* <p>
* After this method is called, you cannot add new data to any segments that were previously under construction.
* <p/>
* <p>
* The add, clear, persistAll, and push methods should all be called from the same thread.
*
* @param identifiers list of segments to push
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -85,6 +86,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
*/
Expand All @@ -108,10 +110,11 @@ public class AppenderatorImpl implements Appenderator
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
);
private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
private final QuerySegmentWalker texasRanger;

private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService mergeExecutor = null;
private volatile ListeningExecutorService pushExecutor = null;
private volatile long nextFlush;
private volatile FileLock basePersistDirLock = null;
private volatile FileChannel basePersistDirLockChannel = null;
Expand Down Expand Up @@ -188,26 +191,34 @@ public int add(
}

final Sink sink = getOrCreateSink(identifier);
int sinkRetVal;
final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
final int sinkRowsInMemoryAfterAdd;

try {
sinkRetVal = sink.add(row);
sinkRowsInMemoryAfterAdd = sink.add(row);
}
catch (IndexSizeExceededException e) {
// Try one more time after swapping, then throw the exception out if it happens again.
persistAll(committerSupplier.get());
sinkRetVal = sink.add(row);
// Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we
// can't add the row (it just failed). This should never actually happen, though, because we check
// sink.canAddRow after returning from add.
log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier);
throw e;
}

if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
persistAll(committerSupplier.get());
if (sinkRowsInMemoryAfterAdd < 0) {
throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
}

if (sinkRetVal < 0) {
throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
} else {
return sink.getNumRows();
rowsCurrentlyInMemory.addAndGet(sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd);

if (!sink.canAppendRow()
|| System.currentTimeMillis() > nextFlush
|| rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
// persistAll clears rowsCurrentlyInMemory, no need to update it.
persistAll(committerSupplier.get());
}

return sink.getNumRows();
}

@Override
Expand All @@ -228,6 +239,12 @@ public int getRowCount(final SegmentIdentifier identifier)
}
}

@VisibleForTesting
int getRowsInMemory()
{
return rowsCurrentlyInMemory.get();
}

private Sink getOrCreateSink(final SegmentIdentifier identifier)
{
Sink retVal = sinks.get(identifier);
Expand Down Expand Up @@ -410,6 +427,9 @@ public String apply(Map.Entry<SegmentIdentifier, Integer> entry)
runExecStopwatch.stop();
resetNextFlush();

// NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes.
rowsCurrentlyInMemory.set(0);

return future;
}

Expand Down Expand Up @@ -455,7 +475,7 @@ public SegmentsAndMetadata apply(Object commitMetadata)
return new SegmentsAndMetadata(dataSegments, commitMetadata);
}
},
mergeExecutor
pushExecutor
);
}

Expand All @@ -464,9 +484,9 @@ public SegmentsAndMetadata apply(Object commitMetadata)
* This is useful if we're going to do something that would otherwise potentially break currently in-progress
* pushes.
*/
private ListenableFuture<?> mergeBarrier()
private ListenableFuture<?> pushBarrier()
{
return mergeExecutor.submit(
return pushExecutor.submit(
new Runnable()
{
@Override
Expand All @@ -480,7 +500,7 @@ public void run()

/**
* Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only
* be run in the single-threaded mergeExecutor.
* be run in the single-threaded pushExecutor.
*
* @param identifier sink identifier
* @param sink sink to push
Expand Down Expand Up @@ -589,7 +609,7 @@ public void close()
try {
shutdownExecutors();
Preconditions.checkState(persistExecutor.awaitTermination(365, TimeUnit.DAYS), "persistExecutor not terminated");
Preconditions.checkState(mergeExecutor.awaitTermination(365, TimeUnit.DAYS), "mergeExecutor not terminated");
Preconditions.checkState(pushExecutor.awaitTermination(365, TimeUnit.DAYS), "pushExecutor not terminated");
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -647,9 +667,9 @@ private void initializeExecutors()
)
);
}
if (mergeExecutor == null) {
if (pushExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
mergeExecutor = MoreExecutors.listeningDecorator(
pushExecutor = MoreExecutors.listeningDecorator(
Execs.newBlockingSingleThreaded(
"appenderator_merge_%d", 1
)
Expand All @@ -660,7 +680,7 @@ private void initializeExecutors()
private void shutdownExecutors()
{
persistExecutor.shutdownNow();
mergeExecutor.shutdownNow();
pushExecutor.shutdownNow();
}

private void resetNextFlush()
Expand Down Expand Up @@ -830,12 +850,18 @@ private ListenableFuture<?> abandonSegment(
final boolean removeOnDiskData
)
{
// Mark this identifier as dropping, so no future merge tasks will pick it up.
// Ensure no future writes will be made to this sink.
sink.finishWriting();

// Mark this identifier as dropping, so no future push tasks will pick it up.
droppingSinks.add(identifier);

// Wait for any outstanding merges to finish, then abandon the segment inside the persist thread.
// Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks).
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());

// Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread.
return Futures.transform(
mergeBarrier(),
pushBarrier(),
new Function<Object, Object>()
{
@Nullable
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/io/druid/segment/realtime/plumber/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,18 @@ public int getNumRows()
}
}

public int getNumRowsInMemory()
{
synchronized (hydrantLock) {
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return 0;
}

return currHydrant.getIndex().size();
}
}

private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,56 @@ public SegmentIdentifier apply(DataSegment input)
}
}

@Test
public void testMaxRowsInMemory() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(3)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
{
@Override
public Committer get()
{
final Object metadata = ImmutableMap.of("eventCount", eventCount.get());

return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}

@Override
public void run()
{
// Do nothing
}
};
}
};

Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "baz", 1), committerSupplier);
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "qux", 1), committerSupplier);
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
}
}

@Test
public void testRestoreFromDisk() throws Exception
{
Expand Down