From e738c240f2030b5c5d379d57f4e25aee596d0ea1 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Fri, 16 Apr 2021 09:17:52 -0700 Subject: [PATCH 01/21] Avoid mapping hydrants in create segments phase for native ingestion --- .../druid/segment/QueryableIndexSegment.java | 47 +++++++++++++++---- .../appenderator/AppenderatorImpl.java | 35 ++++++++++---- 2 files changed, 66 insertions(+), 16 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java index a829dfae184b..d324f1388c1c 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java @@ -19,21 +19,52 @@ package org.apache.druid.segment; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; /** + * */ public class QueryableIndexSegment implements Segment { - private final QueryableIndex index; - private final QueryableIndexStorageAdapter storageAdapter; + private final Supplier indexSupplier; + private final Supplier queryableIndexStorageAdapterSupplier; private final SegmentId segmentId; public QueryableIndexSegment(QueryableIndex index, final SegmentId segmentId) { - this.index = index; - this.storageAdapter = new QueryableIndexStorageAdapter(index); + this.indexSupplier = new Supplier() + { + @Override + public QueryableIndex get() + { + return index; + } + }; + this.queryableIndexStorageAdapterSupplier = Suppliers.memoize(new Supplier() + { + @Override + public QueryableIndexStorageAdapter get() + { + return new QueryableIndexStorageAdapter(index); + } + }); + this.segmentId = segmentId; + } + + public QueryableIndexSegment(Supplier indexSupplier, final SegmentId segmentId) + { + this.indexSupplier = indexSupplier; + this.queryableIndexStorageAdapterSupplier = Suppliers.memoize(new Supplier() + { + @Override + public QueryableIndexStorageAdapter get() + { + return new QueryableIndexStorageAdapter(indexSupplier.get()); + } + }); this.segmentId = segmentId; } @@ -46,25 +77,25 @@ public SegmentId getId() @Override public Interval getDataInterval() { - return index.getDataInterval(); + return indexSupplier.get().getDataInterval(); } @Override public QueryableIndex asQueryableIndex() { - return index; + return indexSupplier.get(); } @Override public StorageAdapter asStorageAdapter() { - return storageAdapter; + return queryableIndexStorageAdapterSupplier.get(); } @Override public void close() { // this is kinda nasty - index.close(); + indexSupplier.get().close(); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 6f98817030e2..8a523e9567a5 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -163,10 +164,10 @@ public class AppenderatorImpl implements Appenderator /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. - * + *

* The sinkTimeline is set to the sink timeline of the provided SinkQuerySegmentWalker. * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized. - * + *

* It is used by UnifiedIndexerAppenderatorsManager which allows queries on data associated with multiple * Appenderators. */ @@ -347,7 +348,8 @@ public AppenderatorAddResult add( } } - if (!skipBytesInMemoryOverheadCheck && bytesCurrentlyInMemory.get() - bytesToBePersisted > maxBytesTuningConfig) { + if (!skipBytesInMemoryOverheadCheck + && bytesCurrentlyInMemory.get() - bytesToBePersisted > maxBytesTuningConfig) { // We are still over maxBytesTuningConfig even after persisting. // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion) final String alertMessage = StringUtils.format( @@ -737,7 +739,8 @@ public ListenableFuture push( private ListenableFuture pushBarrier() { return intermediateTempExecutor.submit( - (Runnable) () -> pushExecutor.submit(() -> {}) + (Runnable) () -> pushExecutor.submit(() -> { + }) ); } @@ -748,7 +751,6 @@ private ListenableFuture pushBarrier() * @param identifier sink identifier * @param sink sink to push * @param useUniquePath true if the segment should be written to a path with a unique identifier - * * @return segment descriptor, or null if the sink is no longer valid */ @Nullable @@ -849,7 +851,11 @@ private DataSegment mergeAndPush( // semantics. () -> dataSegmentPusher.push( mergedFile, - sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, schema.getDimensionsSpec())), + sink.getSegment() + .withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes( + indexes, + schema.getDimensionsSpec() + )), useUniquePath ), exception -> exception instanceof Exception, @@ -1374,7 +1380,6 @@ private File createPersistDirIfNeeded(SegmentIdWithShardSpec identifier) throws * * @param indexToPersist hydrant to persist * @param identifier the segment this hydrant is going to be part of - * * @return the number of rows persisted */ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec identifier) @@ -1413,8 +1418,22 @@ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec id numRows ); + Supplier memoizedIndexSupplier = + Suppliers.memoize(new Supplier() + { + @Override + public QueryableIndex get() + { + try { + return indexIO.loadIndex(persistedFile); + } + catch (IOException e) { + throw new RE(e, "Error while loading index"); + } + } + }); indexToPersist.swapSegment( - new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()) + new QueryableIndexSegment(memoizedIndexSupplier, indexToPersist.getSegmentId()) ); return numRows; From 3ffd5927f56282cd123aa17f34ff5d2f3933dab0 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Fri, 16 Apr 2021 11:32:34 -0700 Subject: [PATCH 02/21] Drop queriable indices after a given sink is fully merged --- .../druid/segment/QueryableIndexSegment.java | 37 ++++++++++--------- .../appenderator/AppenderatorImpl.java | 13 ++++++- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java index d324f1388c1c..f867e88c9abf 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java @@ -33,41 +33,44 @@ public class QueryableIndexSegment implements Segment private final Supplier queryableIndexStorageAdapterSupplier; private final SegmentId segmentId; - public QueryableIndexSegment(QueryableIndex index, final SegmentId segmentId) + + /** + * This constructor is to support passing a memoized supplier to have this lazily initialized. + * @param indexSupplier A supplier that may be memoized (or not) + * @param segmentId The id of the segment for the index + */ + public QueryableIndexSegment(Supplier indexSupplier, final SegmentId segmentId) { - this.indexSupplier = new Supplier() - { - @Override - public QueryableIndex get() - { - return index; - } - }; + this.indexSupplier = indexSupplier; this.queryableIndexStorageAdapterSupplier = Suppliers.memoize(new Supplier() { @Override public QueryableIndexStorageAdapter get() { - return new QueryableIndexStorageAdapter(index); + return new QueryableIndexStorageAdapter(indexSupplier.get()); } }); this.segmentId = segmentId; } - public QueryableIndexSegment(Supplier indexSupplier, final SegmentId segmentId) + /** + * + * @param index The index to back this queryable index + * @param segmentId The id of the segment for the index + */ + public QueryableIndexSegment(QueryableIndex index, final SegmentId segmentId) { - this.indexSupplier = indexSupplier; - this.queryableIndexStorageAdapterSupplier = Suppliers.memoize(new Supplier() + this(new Supplier() { @Override - public QueryableIndexStorageAdapter get() + public QueryableIndex get() { - return new QueryableIndexStorageAdapter(indexSupplier.get()); + return index; } - }); - this.segmentId = segmentId; + }, segmentId); } + @Override public SegmentId getId() { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 8a523e9567a5..d713f39accd3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -862,6 +862,13 @@ private DataSegment mergeAndPush( 5 ); + // Drop the queriable indexes behind the hydrants... they are not needed anymore and their + // mapped file references + // can generate OOMs during merge if enough of them are held back... + for (FireHydrant fireHydrant : sink) { + fireHydrant.swapSegment(null); + } + final long pushFinishTime = System.nanoTime(); objectMapper.writeValue(descriptorFile, segment); @@ -1418,7 +1425,9 @@ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec id numRows ); - Supplier memoizedIndexSupplier = + // Make the queryable index lazy to avoid it during segment creation for native batch ingestion + // (in order to ameliorate OOMs due to needing to hold too many hydrants) + final Supplier memoizedIndexSupplier = Suppliers.memoize(new Supplier() { @Override @@ -1428,7 +1437,7 @@ public QueryableIndex get() return indexIO.loadIndex(persistedFile); } catch (IOException e) { - throw new RE(e, "Error while loading index"); + throw new RE(e, "Error while loading index fo file [%s]", persistedFile.getAbsolutePath()); } } }); From 98781c4c865c946d270ed8ac6c485fca934d5698 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Mon, 19 Apr 2021 11:23:04 -0700 Subject: [PATCH 03/21] Do not drop memory mappings for realtime ingestion --- .../realtime/appenderator/Appenderator.java | 7 ++++++ .../appenderator/AppenderatorImpl.java | 24 ++++++++++++++----- .../realtime/appenderator/Appenderators.java | 6 +++-- .../UnifiedIndexerAppenderatorsManager.java | 3 ++- .../StreamAppenderatorDriverFailTest.java | 5 ++++ 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java index 14796df44a2a..26e3d25e918b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java @@ -214,6 +214,13 @@ ListenableFuture push( */ void closeNow(); + /** + * Flag to tell internals whether physical segments (i.e. hydrants) need to memory map their persisted + * files. Batch ingest does not need to memory map them thus we use this flag to avoid that + * reducing the possibility of OOM's. + */ + boolean needsToMemoryMapIndex(); + /** * Result of {@link Appenderator#add} containing following information * - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index d713f39accd3..c6b2a38d6155 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -162,6 +162,8 @@ public class AppenderatorImpl implements Appenderator private volatile Throwable persistError; + private final boolean memoryMapIndexes; + /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. *

@@ -184,7 +186,8 @@ public class AppenderatorImpl implements Appenderator IndexMerger indexMerger, Cache cache, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean memoryMapIndexes ) { this.myId = id; @@ -200,6 +203,7 @@ public class AppenderatorImpl implements Appenderator this.texasRanger = sinkQuerySegmentWalker; this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters"); this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler"); + this.memoryMapIndexes = memoryMapIndexes; if (sinkQuerySegmentWalker == null) { this.sinkTimeline = new VersionedIntervalTimeline<>( @@ -862,11 +866,13 @@ private DataSegment mergeAndPush( 5 ); - // Drop the queriable indexes behind the hydrants... they are not needed anymore and their - // mapped file references - // can generate OOMs during merge if enough of them are held back... - for (FireHydrant fireHydrant : sink) { - fireHydrant.swapSegment(null); + if (!needsToMemoryMapIndex()) { + // Drop the queriable indexes behind the hydrants... they are not needed anymore and their + // mapped file references + // can generate OOMs during merge if enough of them are held back... + for (FireHydrant fireHydrant : sink) { + fireHydrant.swapSegment(null); + } } final long pushFinishTime = System.nanoTime(); @@ -995,6 +1001,12 @@ public void closeNow() } } + @Override + public boolean needsToMemoryMapIndex() { + return memoryMapIndexes; + } + + private void lockBasePersistDirectory() { if (basePersistDirLock == null) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index c59e4e053e50..00be53e8723f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -89,7 +89,8 @@ public static Appenderator createRealtime( indexMerger, cache, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + true ); } @@ -119,7 +120,8 @@ public static Appenderator createOffline( indexMerger, null, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + false ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 369709882a76..755319623121 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -189,7 +189,8 @@ public Appenderator createRealtimeAppenderatorForTask( wrapIndexMerger(indexMerger), cache, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + true ); datasourceBundle.addAppenderator(taskId, appenderator); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 4f9cd3c34158..e004acbfb010 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -545,5 +545,10 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable Date: Mon, 19 Apr 2021 12:39:07 -0700 Subject: [PATCH 04/21] Style fixes --- .../druid/segment/realtime/appenderator/AppenderatorImpl.java | 3 ++- .../appenderator/StreamAppenderatorDriverFailTest.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index c6b2a38d6155..c447d2506782 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -1002,7 +1002,8 @@ public void closeNow() } @Override - public boolean needsToMemoryMapIndex() { + public boolean needsToMemoryMapIndex() + { return memoryMapIndexes; } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index e004acbfb010..e2a4a209e063 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -546,7 +546,8 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable Date: Tue, 20 Apr 2021 12:19:24 -0700 Subject: [PATCH 05/21] Renamed to match use case better --- .../segment/realtime/appenderator/Appenderator.java | 10 ++++++---- .../realtime/appenderator/AppenderatorImpl.java | 12 ++++++------ .../StreamAppenderatorDriverFailTest.java | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java index 26e3d25e918b..2c61418730e0 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java @@ -215,11 +215,13 @@ ListenableFuture push( void closeNow(); /** - * Flag to tell internals whether physical segments (i.e. hydrants) need to memory map their persisted - * files. Batch ingest does not need to memory map them thus we use this flag to avoid that - * reducing the possibility of OOM's. + * Flag to tell internals whether appenderator is working on behalf of a real time task. + * This is to manage certain aspects as needed. For example, for batch, non-real time tasks, + * physical segments (i.e. hydrants) do not need to memory map their persisted + * files. In this case, the code will avoid memory mapping them thus ameliorating the occurance + * of OOMs. */ - boolean needsToMemoryMapIndex(); + boolean isRealTime(); /** * Result of {@link Appenderator#add} containing following information diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index c447d2506782..6a4514dc7be9 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -162,7 +162,7 @@ public class AppenderatorImpl implements Appenderator private volatile Throwable persistError; - private final boolean memoryMapIndexes; + private final boolean isRealTime; /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. @@ -187,7 +187,7 @@ public class AppenderatorImpl implements Appenderator Cache cache, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler, - boolean memoryMapIndexes + boolean isRealTime ) { this.myId = id; @@ -203,7 +203,7 @@ public class AppenderatorImpl implements Appenderator this.texasRanger = sinkQuerySegmentWalker; this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters"); this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler"); - this.memoryMapIndexes = memoryMapIndexes; + this.isRealTime = isRealTime; if (sinkQuerySegmentWalker == null) { this.sinkTimeline = new VersionedIntervalTimeline<>( @@ -866,7 +866,7 @@ private DataSegment mergeAndPush( 5 ); - if (!needsToMemoryMapIndex()) { + if (!isRealTime()) { // Drop the queriable indexes behind the hydrants... they are not needed anymore and their // mapped file references // can generate OOMs during merge if enough of them are held back... @@ -1002,9 +1002,9 @@ public void closeNow() } @Override - public boolean needsToMemoryMapIndex() + public boolean isRealTime() { - return memoryMapIndexes; + return isRealTime; } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index e2a4a209e063..408aa97e400b 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -546,7 +546,7 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable Date: Tue, 20 Apr 2021 14:11:23 -0700 Subject: [PATCH 06/21] Rollback memoization code and use the real time flag instead --- .../druid/segment/QueryableIndexSegment.java | 52 ++++--------------- .../druid/segment/realtime/FireHydrant.java | 29 +++++++++++ .../appenderator/AppenderatorImpl.java | 52 +++++++++++-------- 3 files changed, 69 insertions(+), 64 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java index f867e88c9abf..a829dfae184b 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java @@ -19,58 +19,24 @@ package org.apache.druid.segment; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; /** - * */ public class QueryableIndexSegment implements Segment { - private final Supplier indexSupplier; - private final Supplier queryableIndexStorageAdapterSupplier; + private final QueryableIndex index; + private final QueryableIndexStorageAdapter storageAdapter; private final SegmentId segmentId; - - /** - * This constructor is to support passing a memoized supplier to have this lazily initialized. - * @param indexSupplier A supplier that may be memoized (or not) - * @param segmentId The id of the segment for the index - */ - public QueryableIndexSegment(Supplier indexSupplier, final SegmentId segmentId) - { - this.indexSupplier = indexSupplier; - this.queryableIndexStorageAdapterSupplier = Suppliers.memoize(new Supplier() - { - @Override - public QueryableIndexStorageAdapter get() - { - return new QueryableIndexStorageAdapter(indexSupplier.get()); - } - }); - this.segmentId = segmentId; - } - - /** - * - * @param index The index to back this queryable index - * @param segmentId The id of the segment for the index - */ public QueryableIndexSegment(QueryableIndex index, final SegmentId segmentId) { - this(new Supplier() - { - @Override - public QueryableIndex get() - { - return index; - } - }, segmentId); + this.index = index; + this.storageAdapter = new QueryableIndexStorageAdapter(index); + this.segmentId = segmentId; } - @Override public SegmentId getId() { @@ -80,25 +46,25 @@ public SegmentId getId() @Override public Interval getDataInterval() { - return indexSupplier.get().getDataInterval(); + return index.getDataInterval(); } @Override public QueryableIndex asQueryableIndex() { - return indexSupplier.get(); + return index; } @Override public StorageAdapter asStorageAdapter() { - return queryableIndexStorageAdapterSupplier.get(); + return storageAdapter; } @Override public void close() { // this is kinda nasty - indexSupplier.get().close(); + index.close(); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java index 9c59387d102d..5a20cd93201b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import java.io.Closeable; +import java.io.File; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -45,6 +46,34 @@ public class FireHydrant private final int count; private final AtomicReference adapter; private volatile IncrementalIndex index; + private File persistedFile; + private SegmentId persistedSegmentId; + + /** + * + * @return The persisted file path, this will be null for real time hydrants + */ + public @Nullable File getPersistedFile() + { + return persistedFile; + } + + public @Nullable SegmentId getPersistedSegmentId() { + return persistedSegmentId; + } + + /** + * This is to support the case of batch ingestion where the hydrant is persisted but not memory mapped, + * we need to remember the persisted file path in order to regenerate the memory mapped index to + * prepare for merging after all phyisical segmetns are built + * @param persistedFile The file that was persisted when this hydrant was persisted + */ + public void setPersistedMetadata(File persistedFile,SegmentId segmentId) + { + this.persistedFile = persistedFile; + this.persistedSegmentId = segmentId; + } + public FireHydrant(IncrementalIndex index, int count, SegmentId segmentId) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 6a4514dc7be9..32975164ae90 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -26,7 +26,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -60,6 +59,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.Segment; import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -819,6 +819,24 @@ private DataSegment mergeAndPush( Closer closer = Closer.create(); try { for (FireHydrant fireHydrant : sink) { + + // if batch, swap/persist did not memory map the incremental index, we need it mapped now: + if (!isRealTime()) { + // sanity: + if (fireHydrant.getPersistedFile() == null) { + throw new ISE("Persisted file for batch hydrant is null!"); + } else if (fireHydrant.getPersistedSegmentId() == null) { + throw new ISE( + "Persisted segmentId for batch hydrant in file [%s] is null!", + fireHydrant.getPersistedFile().getPath() + ); + } + fireHydrant.swapSegment(new QueryableIndexSegment( + indexIO.loadIndex(fireHydrant.getPersistedFile()), + fireHydrant.getPersistedSegmentId() + )); + } + Pair segmentAndCloseable = fireHydrant.getAndIncrementSegment(); final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant); @@ -894,11 +912,13 @@ private DataSegment mergeAndPush( return segment; } - catch (Exception e) { + catch ( + Exception e) { metrics.incrementFailedHandoffs(); log.warn(e, "Failed to push merged index for segment[%s].", identifier); throw new RuntimeException(e); } + } @Override @@ -1438,25 +1458,15 @@ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec id numRows ); - // Make the queryable index lazy to avoid it during segment creation for native batch ingestion - // (in order to ameliorate OOMs due to needing to hold too many hydrants) - final Supplier memoizedIndexSupplier = - Suppliers.memoize(new Supplier() - { - @Override - public QueryableIndex get() - { - try { - return indexIO.loadIndex(persistedFile); - } - catch (IOException e) { - throw new RE(e, "Error while loading index fo file [%s]", persistedFile.getAbsolutePath()); - } - } - }); - indexToPersist.swapSegment( - new QueryableIndexSegment(memoizedIndexSupplier, indexToPersist.getSegmentId()) - ); + // Do not map unless it is being driven by a real time task: + Segment segmentToSwap = null; + if (isRealTime()) { + segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()); + } else { + // remember file path & segment id to rebuild the queryable index for merge: + indexToPersist.setPersistedMetadata(persistedFile, indexToPersist.getSegmentId()); + } + indexToPersist.swapSegment(segmentToSwap); return numRows; } From 78b1884a9f043849db8740d3699b6167f68d1213 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Wed, 21 Apr 2021 09:23:29 -0700 Subject: [PATCH 07/21] Null ptr fix in FireHydrant toString plus adjustments to memory pressure tracking calculations --- .../druid/segment/realtime/FireHydrant.java | 2 +- .../appenderator/AppenderatorImpl.java | 24 +++++++++++++------ .../segment/realtime/FireHydrantTest.java | 7 ++++++ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java index 5a20cd93201b..65a7d129f2ce 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java @@ -242,7 +242,7 @@ public String toString() // Do not include IncrementalIndex in toString as AbstractIndex.toString() actually prints // all the rows in the index return "FireHydrant{" + - "queryable=" + adapter.get().getId() + + "queryable=" + (adapter.get() == null ? "null" : adapter.get().getId()) + ", count=" + count + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 32975164ae90..e5b892c02e0d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -344,7 +344,8 @@ public AppenderatorAddResult add( if (sinkEntry != null) { bytesToBePersisted += sinkEntry.getBytesInMemory(); if (sinkEntry.swappable()) { - // After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory. + // After swapping the sink, we use memory mapped segment instead (but only for real time appenderators!). + // However, the memory mapped segment still consumes memory. // These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory int memoryStillInUse = calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant()); bytesCurrentlyInMemory.addAndGet(memoryStillInUse); @@ -358,10 +359,14 @@ public AppenderatorAddResult add( // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion) final String alertMessage = StringUtils.format( "Task has exceeded safe estimated heap usage limits, failing " - + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])", + + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])" + + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])", sinks.size(), sinks.values().stream().mapToInt(Iterables::size).sum(), - getTotalRowCount() + getTotalRowCount(), + bytesCurrentlyInMemory.get(), + bytesToBePersisted, + maxBytesTuningConfig ); final String errorMessage = StringUtils.format( "%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to " @@ -1504,10 +1509,15 @@ private int calculateMMappedHydrantMemoryInUsed(FireHydrant hydrant) // These calculations are approximated from actual heap dumps. // Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment, // Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.) - return Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT + - (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) + - (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + - ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER; + int total; + total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT; + if (isRealTime()) { + // for real time ad references to byte memory mapped references.. + total += (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) + + (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + + ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER; + } + return total; } private int calculateSinkMemoryInUsed(Sink sink) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java index 4f288426e50f..464141a42a11 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java @@ -210,4 +210,11 @@ public void testGetSegmentForQueryButNotAbleToAcquireReferencesSegmentClosed() Function.identity() ); } + + @Test + public void testToStringWhenSwappedWithNull() + { + hydrant.swapSegment(null); + hydrant.toString(); + } } From f0a08d691966bccf5d6b86ebe47e72f571535cd4 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Wed, 21 Apr 2021 13:44:37 -0700 Subject: [PATCH 08/21] Style --- .../org/apache/druid/segment/realtime/FireHydrant.java | 9 +++++---- .../segment/realtime/appenderator/AppenderatorImpl.java | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java index 65a7d129f2ce..b96e49327919 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java @@ -58,17 +58,18 @@ public class FireHydrant return persistedFile; } - public @Nullable SegmentId getPersistedSegmentId() { + public @Nullable SegmentId getPersistedSegmentId() + { return persistedSegmentId; } /** - * This is to support the case of batch ingestion where the hydrant is persisted but not memory mapped, - * we need to remember the persisted file path in order to regenerate the memory mapped index to + * This is to support the case of batch ingestion where the hydrant is persisted but not memory mapped. + * We need to remember the persisted file path in order to regenerate the memory mapped index to * prepare for merging after all phyisical segmetns are built * @param persistedFile The file that was persisted when this hydrant was persisted */ - public void setPersistedMetadata(File persistedFile,SegmentId segmentId) + public void setPersistedMetadata(File persistedFile, SegmentId segmentId) { this.persistedFile = persistedFile; this.persistedSegmentId = segmentId; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index e5b892c02e0d..106a659e690e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -917,8 +917,7 @@ private DataSegment mergeAndPush( return segment; } - catch ( - Exception e) { + catch (Exception e) { metrics.incrementFailedHandoffs(); log.warn(e, "Failed to push merged index for segment[%s].", identifier); throw new RuntimeException(e); From 0824e213cf6b00d68e7d1662fc8dfb788728f0f7 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Wed, 21 Apr 2021 17:34:48 -0700 Subject: [PATCH 09/21] Log some count stats --- .../appenderator/AppenderatorImpl.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 106a659e690e..e23e15a81783 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.collections4.IterableUtils; import org.apache.druid.client.cache.Cache; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; @@ -569,6 +570,8 @@ public ListenableFuture persistAll(@Nullable final Committer committer) final List> indexesToPersist = new ArrayList<>(); int numPersistedRows = 0; long bytesPersisted = 0L; + AtomicLong totalHydrantsCount = new AtomicLong(); + AtomicLong totalHydrantsPersisted = new AtomicLong(); for (Map.Entry entry : sinks.entrySet()) { final SegmentIdWithShardSpec identifier = entry.getKey(); final Sink sink = entry.getValue(); @@ -576,21 +579,26 @@ public ListenableFuture persistAll(@Nullable final Committer committer) throw new ISE("No sink for identifier: %s", identifier); } final List hydrants = Lists.newArrayList(sink); + totalHydrantsCount.addAndGet(hydrants.size()); currentHydrants.put(identifier.toString(), hydrants.size()); numPersistedRows += sink.getNumRowsInMemory(); bytesPersisted += sink.getBytesInMemory(); final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); + // gather hydrants that have not been persisted: for (FireHydrant hydrant : hydrants.subList(0, limit)) { if (!hydrant.hasSwapped()) { log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier); indexesToPersist.add(Pair.of(hydrant, identifier)); + totalHydrantsPersisted.addAndGet(1); } } if (sink.swappable()) { + // Now create a new hydrant and get the old one to persist it (i.e. if swap): indexesToPersist.add(Pair.of(sink.swap(), identifier)); + totalHydrantsPersisted.addAndGet(1); } } log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); @@ -598,6 +606,7 @@ public ListenableFuture persistAll(@Nullable final Committer committer) final Object commitMetadata = committer == null ? null : committer.getMetadata(); final Stopwatch runExecStopwatch = Stopwatch.createStarted(); final Stopwatch persistStopwatch = Stopwatch.createStarted(); + AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows); final ListenableFuture future = persistExecutor.submit( new Callable() { @@ -651,6 +660,14 @@ public Object call() throws IOException .distinct() .collect(Collectors.joining(", ")) ); + log.info( + "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across sinks): [%d]", + rowIngestionMeters.getProcessed(), + totalPersistedRows.get(), + sinks.size(), + totalHydrantsCount.get(), + totalHydrantsPersisted.get() + ); // return null if committer is null return commitMetadata; @@ -693,6 +710,7 @@ public ListenableFuture push( ) { final Map theSinks = new HashMap<>(); + AtomicLong pushedHydrantsCount = new AtomicLong(); for (final SegmentIdWithShardSpec identifier : identifiers) { final Sink sink = sinks.get(identifier); if (sink == null) { @@ -702,6 +720,8 @@ public ListenableFuture push( if (sink.finishWriting()) { totalRows.addAndGet(-sink.getNumRows()); } + // count hydrants for stats: + pushedHydrantsCount.addAndGet(IterableUtils.size(sink)); } return Futures.transform( @@ -711,6 +731,10 @@ public ListenableFuture push( (Function) commitMetadata -> { final List dataSegments = new ArrayList<>(); + log.info("Preparing to push (stats): processed rows: [%d], sinks: [%d], fireHydrants (across sinks): [%d]", + rowIngestionMeters.getProcessed(), theSinks.size(), pushedHydrantsCount.get() + ); + log.debug( "Building and pushing segments: %s", theSinks.keySet().stream().map(SegmentIdWithShardSpec::toString).collect(Collectors.joining(", ")) @@ -734,6 +758,8 @@ public ListenableFuture push( } } + log.info("Push complete..."); + return new SegmentsAndCommitMetadata(dataSegments, commitMetadata); }, pushExecutor From 87f05e85c9d29ac3125ed8e342accdc40116ab97 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Wed, 21 Apr 2021 20:30:33 -0700 Subject: [PATCH 10/21] Make sure sinks size is obtained at the right time --- .../druid/segment/realtime/appenderator/AppenderatorImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index e23e15a81783..30e280264926 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -572,6 +572,7 @@ public ListenableFuture persistAll(@Nullable final Committer committer) long bytesPersisted = 0L; AtomicLong totalHydrantsCount = new AtomicLong(); AtomicLong totalHydrantsPersisted = new AtomicLong(); + final long totalSinks = sinks.size(); for (Map.Entry entry : sinks.entrySet()) { final SegmentIdWithShardSpec identifier = entry.getKey(); final Sink sink = entry.getValue(); @@ -664,7 +665,7 @@ public Object call() throws IOException "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across sinks): [%d]", rowIngestionMeters.getProcessed(), totalPersistedRows.get(), - sinks.size(), + totalSinks, totalHydrantsCount.get(), totalHydrantsPersisted.get() ); From e75e2ab37584ec6acff1767bf4d0ad22729ba51f Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Thu, 22 Apr 2021 18:03:37 -0700 Subject: [PATCH 11/21] BatchAppenderator unit test --- .../appenderator/BatchAppenderatorTest.java | 171 ++++++++++ .../appenderator/BatchAppenderatorTester.java | 295 ++++++++++++++++++ .../realtime/appenderator/Appenderator.java | 3 +- 3 files changed, 468 insertions(+), 1 deletion(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java new file mode 100644 index 000000000000..731e1b6de44b --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.appenderator; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.realtime.appenderator.Appenderator; +import org.apache.druid.segment.realtime.appenderator.AppenderatorTester; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class BatchAppenderatorTest extends InitializedNullHandlingTest +{ + private static final List IDENTIFIERS = ImmutableList.of( + si("2000/2001", "A", 0), + si("2000/2001", "A", 1), + si("2001/2002", "A", 0) + ); + + @Test + public void testSimpleIngestion() throws Exception + { + try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, true)) { + final Appenderator appenderator = tester.getAppenderator(); + boolean thrown; + + // startJob + Assert.assertEquals(null, appenderator.startJob()); + + // getDataSource + Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource()); + + // add + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 2, + appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar", 2), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 4), null) + .getNumRowsInSegment() + ); + + // getSegments + Assert.assertEquals(IDENTIFIERS.subList(0, 2), sorted(appenderator.getSegments())); + + // getRowCount + Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0))); + Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1))); + thrown = false; + try { + appenderator.getRowCount(IDENTIFIERS.get(2)); + } + catch (IllegalStateException e) { + thrown = true; + } + Assert.assertTrue(thrown); + + // push all + final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push( + appenderator.getSegments(), + null, + false + ).get(); + Assert.assertEquals( + IDENTIFIERS.subList(0, 2), + sorted( + Lists.transform( + segmentsAndCommitMetadata.getSegments(), + new Function() + { + @Override + public SegmentIdWithShardSpec apply(DataSegment input) + { + return SegmentIdWithShardSpec.fromDataSegment(input); + } + } + ) + ) + ); + Assert.assertEquals(sorted(tester.getPushedSegments()), sorted(segmentsAndCommitMetadata.getSegments())); + + // clear + appenderator.clear(); + Assert.assertTrue(appenderator.getSegments().isEmpty()); + } + } + + private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum) + { + return new SegmentIdWithShardSpec( + AppenderatorTester.DATASOURCE, + Intervals.of(interval), + version, + new LinearShardSpec(partitionNum) + ); + } + + static InputRow ir(String ts, String dim, Object met) + { + return new MapBasedInputRow( + DateTimes.of(ts).getMillis(), + ImmutableList.of("dim"), + ImmutableMap.of( + "dim", + dim, + "met", + met + ) + ); + } + + private static List sorted(final List xs) + { + final List xsSorted = Lists.newArrayList(xs); + Collections.sort( + xsSorted, + (T a, T b) -> { + if (a instanceof SegmentIdWithShardSpec && b instanceof SegmentIdWithShardSpec) { + return ((SegmentIdWithShardSpec) a).compareTo(((SegmentIdWithShardSpec) b)); + } else if (a instanceof DataSegment && b instanceof DataSegment) { + return ((DataSegment) a).getId().compareTo(((DataSegment) b).getId()); + } else { + throw new IllegalStateException("BAD"); + } + } + ); + return xsSorted; + } + +} + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java new file mode 100644 index 000000000000..5b74b99c683a --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.ParseExceptionHandler; +import org.apache.druid.segment.incremental.RowIngestionMeters; +import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.realtime.FireDepartmentMetrics; +import org.apache.druid.segment.realtime.appenderator.Appenderator; +import org.apache.druid.segment.realtime.appenderator.Appenderators; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +public class BatchAppenderatorTester implements AutoCloseable +{ + public static final String DATASOURCE = "foo"; + + private final DataSchema schema; + private final IndexTask.IndexTuningConfig tuningConfig; + private final FireDepartmentMetrics metrics; + private final DataSegmentPusher dataSegmentPusher; + private final ObjectMapper objectMapper; + private final Appenderator appenderator; + private final IndexIO indexIO; + private final IndexMerger indexMerger; + private final ServiceEmitter emitter; + + private final List pushedSegments = new CopyOnWriteArrayList<>(); + + public BatchAppenderatorTester( + final int maxRowsInMemory + ) + { + this(maxRowsInMemory, -1, null, false); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final boolean enablePushFailure + ) + { + this(maxRowsInMemory, -1, null, enablePushFailure); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + final File basePersistDirectory, + final boolean enablePushFailure + ) + { + this( + maxRowsInMemory, + maxSizeInBytes, + basePersistDirectory, + enablePushFailure, + new SimpleRowIngestionMeters(), + false + ); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + final File basePersistDirectory, + final boolean enablePushFailure, + final RowIngestionMeters rowIngestionMeters, + final boolean skipBytesInMemoryOverheadCheck + ) + { + objectMapper = new DefaultObjectMapper(); + objectMapper.registerSubtypes(LinearShardSpec.class); + + final Map parserMap = objectMapper.convertValue( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(null, null, null), + null, + null, + null + ) + ), + Map.class + ); + schema = new DataSchema( + DATASOURCE, + parserMap, + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("met", "met") + }, + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null), + null, + objectMapper + ); + tuningConfig = new IndexTask.IndexTuningConfig( + null, + 2, + null, + maxRowsInMemory, + maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes, + false, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + true, + null, + null, + null, + null + ).withBasePersistDirectory(createNewBasePersistDirectory()); + + metrics = new FireDepartmentMetrics(); + + indexIO = new IndexIO( + objectMapper, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + + emitter = new ServiceEmitter( + "test", + "test", + new NoopEmitter() + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + dataSegmentPusher = new DataSegmentPusher() + { + private boolean mustFail = true; + + @Deprecated + @Override + public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException + { + if (enablePushFailure && mustFail) { + mustFail = false; + throw new IOException("Push failure test"); + } else if (enablePushFailure) { + mustFail = true; + } + pushedSegments.add(segment); + return segment; + } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } + }; + appenderator = Appenderators.createOffline( + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0) + ); + } + + private long getDefaultMaxBytesInMemory() + { + return (Runtime.getRuntime().totalMemory()) / 3; + } + + public DataSchema getSchema() + { + return schema; + } + + public IndexTask.IndexTuningConfig getTuningConfig() + { + return tuningConfig; + } + + public FireDepartmentMetrics getMetrics() + { + return metrics; + } + + public DataSegmentPusher getDataSegmentPusher() + { + return dataSegmentPusher; + } + + public ObjectMapper getObjectMapper() + { + return objectMapper; + } + + public Appenderator getAppenderator() + { + return appenderator; + } + + public List getPushedSegments() + { + return pushedSegments; + } + + @Override + public void close() throws Exception + { + appenderator.close(); + emitter.close(); + FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory()); + } + + private static File createNewBasePersistDirectory() + { + return FileUtils.createTempDir("druid-batch-persist"); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java index 2c61418730e0..1d7da70837a7 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java @@ -251,7 +251,8 @@ SegmentIdWithShardSpec getSegmentIdentifier() return segmentIdentifier; } - int getNumRowsInSegment() + @VisibleForTesting + public int getNumRowsInSegment() { return numRowsInSegment; } From c5f284000e57a04992484a23b97d8fc07c1874bc Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Thu, 22 Apr 2021 18:46:36 -0700 Subject: [PATCH 12/21] Fix comment typos --- .../apache/druid/segment/realtime/FireHydrant.java | 12 +++++++++--- .../realtime/appenderator/AppenderatorImpl.java | 8 ++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java index b96e49327919..3de9ec03cc39 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java @@ -40,6 +40,7 @@ import java.util.function.Function; /** + * */ public class FireHydrant { @@ -50,14 +51,18 @@ public class FireHydrant private SegmentId persistedSegmentId; /** - * - * @return The persisted file path, this will be null for real time hydrants + * @return The persisted file path. This is needed to recreate mapped files before merging. + * it will be null for real time hydrants. */ public @Nullable File getPersistedFile() { return persistedFile; } + /** + * @return The persisted segment id. This is needed to recreate mapped files before merging. + * It will be null for real time hydrants + */ public @Nullable SegmentId getPersistedSegmentId() { return persistedSegmentId; @@ -66,7 +71,8 @@ public class FireHydrant /** * This is to support the case of batch ingestion where the hydrant is persisted but not memory mapped. * We need to remember the persisted file path in order to regenerate the memory mapped index to - * prepare for merging after all phyisical segmetns are built + * prepare for merging after all physical segments are built + * * @param persistedFile The file that was persisted when this hydrant was persisted */ public void setPersistedMetadata(File persistedFile, SegmentId segmentId) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 30e280264926..d4e9a5127975 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -597,7 +597,7 @@ public ListenableFuture persistAll(@Nullable final Committer committer) } if (sink.swappable()) { - // Now create a new hydrant and get the old one to persist it (i.e. if swap): + // It is swappable. Get the old one to persist it and create a new one: indexesToPersist.add(Pair.of(sink.swap(), identifier)); totalHydrantsPersisted.addAndGet(1); } @@ -917,7 +917,7 @@ private DataSegment mergeAndPush( ); if (!isRealTime()) { - // Drop the queriable indexes behind the hydrants... they are not needed anymore and their + // Drop the queryable indexes behind the hydrants... they are not needed anymore and their // mapped file references // can generate OOMs during merge if enough of them are held back... for (FireHydrant fireHydrant : sink) { @@ -1489,7 +1489,7 @@ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec id numRows ); - // Do not map unless it is being driven by a real time task: + // Map only when this appenderator is being driven by a real time task: Segment segmentToSwap = null; if (isRealTime()) { segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()); @@ -1538,7 +1538,7 @@ private int calculateMMappedHydrantMemoryInUsed(FireHydrant hydrant) int total; total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT; if (isRealTime()) { - // for real time ad references to byte memory mapped references.. + // for real time add references to byte memory mapped references.. total += (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) + (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER; From 6445eb243fd1799ce7d3ae12bd138e3da8170300 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Mon, 26 Apr 2021 12:08:38 -0700 Subject: [PATCH 13/21] Renamed methods to make them more readable --- .../appenderator/BatchAppenderatorTest.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java index 731e1b6de44b..c5831fc217e2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java @@ -43,9 +43,9 @@ public class BatchAppenderatorTest extends InitializedNullHandlingTest { private static final List IDENTIFIERS = ImmutableList.of( - si("2000/2001", "A", 0), - si("2000/2001", "A", 1), - si("2001/2002", "A", 0) + createSegmentId("2000/2001", "A", 0), + createSegmentId("2000/2001", "A", 1), + createSegmentId("2001/2002", "A", 0) ); @Test @@ -64,19 +64,19 @@ public void testSimpleIngestion() throws Exception // add Assert.assertEquals( 1, - appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), null) + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null) .getNumRowsInSegment() ); Assert.assertEquals( 2, - appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar", 2), null) + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null) .getNumRowsInSegment() ); Assert.assertEquals( 1, - appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 4), null) + appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null) .getNumRowsInSegment() ); @@ -125,17 +125,18 @@ public SegmentIdWithShardSpec apply(DataSegment input) } } - private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum) + private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum) { return new SegmentIdWithShardSpec( AppenderatorTester.DATASOURCE, Intervals.of(interval), version, new LinearShardSpec(partitionNum) + ); } - static InputRow ir(String ts, String dim, Object met) + static InputRow createInputRow(String ts, String dim, Object met) { return new MapBasedInputRow( DateTimes.of(ts).getMillis(), From 0de3cb923e2b516ceec122809ca36bf6040d0b02 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Wed, 28 Apr 2021 16:45:53 -0700 Subject: [PATCH 14/21] Move persisted metadata from FireHydrant class to AppenderatorImpl. Removed superfluous differences and fix comment typo. Removed custom comparator --- .../appenderator/BatchAppenderatorTest.java | 30 +++------------ .../druid/segment/realtime/FireHydrant.java | 36 ------------------ .../appenderator/AppenderatorImpl.java | 37 ++++++++++++++----- 3 files changed, 33 insertions(+), 70 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java index c5831fc217e2..f8a1bd047b25 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java @@ -37,8 +37,8 @@ import org.junit.Assert; import org.junit.Test; -import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; public class BatchAppenderatorTest extends InitializedNullHandlingTest { @@ -81,7 +81,8 @@ public void testSimpleIngestion() throws Exception ); // getSegments - Assert.assertEquals(IDENTIFIERS.subList(0, 2), sorted(appenderator.getSegments())); + Assert.assertEquals(IDENTIFIERS.subList(0, 2), + appenderator.getSegments().stream().sorted().collect(Collectors.toList())); // getRowCount Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0))); @@ -103,7 +104,6 @@ public void testSimpleIngestion() throws Exception ).get(); Assert.assertEquals( IDENTIFIERS.subList(0, 2), - sorted( Lists.transform( segmentsAndCommitMetadata.getSegments(), new Function() @@ -114,12 +114,11 @@ public SegmentIdWithShardSpec apply(DataSegment input) return SegmentIdWithShardSpec.fromDataSegment(input); } } - ) - ) + ).stream().sorted().collect(Collectors.toList()) ); - Assert.assertEquals(sorted(tester.getPushedSegments()), sorted(segmentsAndCommitMetadata.getSegments())); + Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()), + segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())); - // clear appenderator.clear(); Assert.assertTrue(appenderator.getSegments().isEmpty()); } @@ -150,23 +149,6 @@ static InputRow createInputRow(String ts, String dim, Object met) ); } - private static List sorted(final List xs) - { - final List xsSorted = Lists.newArrayList(xs); - Collections.sort( - xsSorted, - (T a, T b) -> { - if (a instanceof SegmentIdWithShardSpec && b instanceof SegmentIdWithShardSpec) { - return ((SegmentIdWithShardSpec) a).compareTo(((SegmentIdWithShardSpec) b)); - } else if (a instanceof DataSegment && b instanceof DataSegment) { - return ((DataSegment) a).getId().compareTo(((DataSegment) b).getId()); - } else { - throw new IllegalStateException("BAD"); - } - } - ); - return xsSorted; - } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java index 3de9ec03cc39..29a8986a04cf 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java @@ -34,53 +34,17 @@ import javax.annotation.Nullable; import java.io.Closeable; -import java.io.File; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; /** - * */ public class FireHydrant { private final int count; private final AtomicReference adapter; private volatile IncrementalIndex index; - private File persistedFile; - private SegmentId persistedSegmentId; - - /** - * @return The persisted file path. This is needed to recreate mapped files before merging. - * it will be null for real time hydrants. - */ - public @Nullable File getPersistedFile() - { - return persistedFile; - } - - /** - * @return The persisted segment id. This is needed to recreate mapped files before merging. - * It will be null for real time hydrants - */ - public @Nullable SegmentId getPersistedSegmentId() - { - return persistedSegmentId; - } - - /** - * This is to support the case of batch ingestion where the hydrant is persisted but not memory mapped. - * We need to remember the persisted file path in order to regenerate the memory mapped index to - * prepare for merging after all physical segments are built - * - * @param persistedFile The file that was persisted when this hydrant was persisted - */ - public void setPersistedMetadata(File persistedFile, SegmentId segmentId) - { - this.persistedFile = persistedFile; - this.persistedSegmentId = segmentId; - } - public FireHydrant(IncrementalIndex index, int count, SegmentId segmentId) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index d4e9a5127975..1baef98a555c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -72,6 +72,7 @@ import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; @@ -164,13 +165,18 @@ public class AppenderatorImpl implements Appenderator private volatile Throwable persistError; private final boolean isRealTime; + // Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator + // in order to facilitate the mapping of the QueryableIndex associated with a given hydrant + // at merge time. This is necessary since batch appenderator will not map the QueryableIndex + // at persist time in order to minimize its memory footprint. + private final Map> persistedHydrantMetadata = new HashMap<>(); /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. - *

+ * * The sinkTimeline is set to the sink timeline of the provided SinkQuerySegmentWalker. * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized. - *

+ * * It is used by UnifiedIndexerAppenderatorsManager which allows queries on data associated with multiple * Appenderators. */ @@ -787,6 +793,7 @@ private ListenableFuture pushBarrier() * @param identifier sink identifier * @param sink sink to push * @param useUniquePath true if the segment should be written to a path with a unique identifier + * * @return segment descriptor, or null if the sink is no longer valid */ @Nullable @@ -854,18 +861,28 @@ private DataSegment mergeAndPush( // if batch, swap/persist did not memory map the incremental index, we need it mapped now: if (!isRealTime()) { + + // sanity + Pair persistedMetadata = persistedHydrantMetadata.get(fireHydrant); + if (persistedMetadata == null) { + throw new ISE("Persisted metadata for batch hydrant [%s] is null!", fireHydrant); + } + + File persistedFile = persistedMetadata.lhs; + SegmentId persistedSegmentId = persistedMetadata.rhs; + // sanity: - if (fireHydrant.getPersistedFile() == null) { - throw new ISE("Persisted file for batch hydrant is null!"); - } else if (fireHydrant.getPersistedSegmentId() == null) { + if (persistedFile == null) { + throw new ISE("Persisted file for batch hydrant [%s] is null!", fireHydrant); + } else if (persistedSegmentId == null) { throw new ISE( "Persisted segmentId for batch hydrant in file [%s] is null!", - fireHydrant.getPersistedFile().getPath() + persistedFile.getPath() ); } fireHydrant.swapSegment(new QueryableIndexSegment( - indexIO.loadIndex(fireHydrant.getPersistedFile()), - fireHydrant.getPersistedSegmentId() + indexIO.loadIndex(persistedFile), + persistedSegmentId )); } @@ -949,7 +966,6 @@ private DataSegment mergeAndPush( log.warn(e, "Failed to push merged index for segment[%s].", identifier); throw new RuntimeException(e); } - } @Override @@ -1451,6 +1467,7 @@ private File createPersistDirIfNeeded(SegmentIdWithShardSpec identifier) throws * * @param indexToPersist hydrant to persist * @param identifier the segment this hydrant is going to be part of + * * @return the number of rows persisted */ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec identifier) @@ -1495,7 +1512,7 @@ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec id segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()); } else { // remember file path & segment id to rebuild the queryable index for merge: - indexToPersist.setPersistedMetadata(persistedFile, indexToPersist.getSegmentId()); + persistedHydrantMetadata.put(indexToPersist, new Pair<>(persistedFile, indexToPersist.getSegmentId())); } indexToPersist.swapSegment(segmentToSwap); From 63559e8d7e2ee10fb7c668e3260dade73c39e58f Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Thu, 29 Apr 2021 11:52:04 -0700 Subject: [PATCH 15/21] Missing dependency --- server/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/pom.xml b/server/pom.xml index 2c5ad5b16161..f19db30976cf 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -454,6 +454,11 @@ 1.3 test + + org.apache.commons + commons-collections4 + 4.2 + From 699d6564f521aabde1be11a5351ba14b1cd27a0f Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Mon, 3 May 2021 14:57:18 -0700 Subject: [PATCH 16/21] Make persisted hydrant metadata map concurrent and better reflect the fact that keys are Java references. Maintain persisted metadata when dropping/closing segments. --- .../appenderator/AppenderatorImpl.java | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 1baef98a555c..9130fc3cc815 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -86,7 +86,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -165,11 +167,17 @@ public class AppenderatorImpl implements Appenderator private volatile Throwable persistError; private final boolean isRealTime; - // Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator - // in order to facilitate the mapping of the QueryableIndex associated with a given hydrant - // at merge time. This is necessary since batch appenderator will not map the QueryableIndex - // at persist time in order to minimize its memory footprint. - private final Map> persistedHydrantMetadata = new HashMap<>(); + /** + * Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator + * in order to facilitate the mapping of the QueryableIndex associated with a given hydrant + * at merge time. This is necessary since batch appenderator will not map the QueryableIndex + * at persist time in order to minimize its memory footprint. This has to be synchronized since the + * map bay be accessed from multiple threads. + * Use {@link IdentityHashMap} to better reflect the fact that the key needs to be interpreted + * with reference semantics. + */ + private final Map> persistedHydrantMetadata = + Collections.synchronizedMap(new IdentityHashMap<>()); /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. @@ -548,6 +556,9 @@ public void clear() throws InterruptedException futures.add(abandonSegment(entry.getKey(), entry.getValue(), true)); } + // Re-initialize hydrant map: + persistedHydrantMetadata.clear(); + // Await dropping. Futures.allAsList(futures).get(); } @@ -1392,6 +1403,8 @@ public Void apply(@Nullable Object input) cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); } hydrant.swapSegment(null); + // remove hydrant from persisted metadata: + persistedHydrantMetadata.remove(hydrant); } if (removeOnDiskData) { From 09c2d2f1b4e65ad242044be29be12faee3e9f608 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Tue, 4 May 2021 15:38:34 -0700 Subject: [PATCH 17/21] Replaced concurrent variables with normal ones --- .../realtime/appenderator/AppenderatorImpl.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 9130fc3cc815..a507f4d0b3f6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections4.IterableUtils; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.druid.client.cache.Cache; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; @@ -587,8 +588,8 @@ public ListenableFuture persistAll(@Nullable final Committer committer) final List> indexesToPersist = new ArrayList<>(); int numPersistedRows = 0; long bytesPersisted = 0L; - AtomicLong totalHydrantsCount = new AtomicLong(); - AtomicLong totalHydrantsPersisted = new AtomicLong(); + MutableLong totalHydrantsCount = new MutableLong(); + MutableLong totalHydrantsPersisted = new MutableLong(); final long totalSinks = sinks.size(); for (Map.Entry entry : sinks.entrySet()) { final SegmentIdWithShardSpec identifier = entry.getKey(); @@ -597,7 +598,7 @@ public ListenableFuture persistAll(@Nullable final Committer committer) throw new ISE("No sink for identifier: %s", identifier); } final List hydrants = Lists.newArrayList(sink); - totalHydrantsCount.addAndGet(hydrants.size()); + totalHydrantsCount.add(hydrants.size()); currentHydrants.put(identifier.toString(), hydrants.size()); numPersistedRows += sink.getNumRowsInMemory(); bytesPersisted += sink.getBytesInMemory(); @@ -609,14 +610,14 @@ public ListenableFuture persistAll(@Nullable final Committer committer) if (!hydrant.hasSwapped()) { log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier); indexesToPersist.add(Pair.of(hydrant, identifier)); - totalHydrantsPersisted.addAndGet(1); + totalHydrantsPersisted.add(1); } } if (sink.swappable()) { // It is swappable. Get the old one to persist it and create a new one: indexesToPersist.add(Pair.of(sink.swap(), identifier)); - totalHydrantsPersisted.addAndGet(1); + totalHydrantsPersisted.add(1); } } log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); @@ -683,8 +684,8 @@ public Object call() throws IOException rowIngestionMeters.getProcessed(), totalPersistedRows.get(), totalSinks, - totalHydrantsCount.get(), - totalHydrantsPersisted.get() + totalHydrantsCount.longValue(), + totalHydrantsPersisted.longValue() ); // return null if committer is null From faadfbd4f4738d1bbf5b9ca5ad89bec9f496f614 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Thu, 6 May 2021 11:20:33 -0700 Subject: [PATCH 18/21] Added batchMemoryMappedIndex "fallback" flag with default "false". Set this to "true" make code fallback to previous code path. --- docs/configuration/index.md | 1 + .../indexing/kafka/KafkaIndexTaskTest.java | 1 + .../kinesis/KinesisIndexTaskTest.java | 1 + .../indexing/common/config/TaskConfig.java | 11 ++- .../common/task/BatchAppenderators.java | 3 +- .../appenderator/BatchAppenderatorTest.java | 83 ++++++++++++++++++- .../appenderator/BatchAppenderatorTester.java | 24 +++--- .../indexing/common/TaskToolboxTest.java | 2 +- ...penderatorDriverRealtimeIndexTaskTest.java | 1 + .../common/task/CompactionTaskRunTest.java | 2 +- .../common/task/CompactionTaskTest.java | 2 +- .../indexing/common/task/HadoopTaskTest.java | 1 + .../common/task/IngestionTestBase.java | 2 +- .../common/task/RealtimeIndexTaskTest.java | 1 + .../common/task/TestAppenderatorsManager.java | 6 +- ...stractParallelIndexSupervisorTaskTest.java | 5 +- .../SingleTaskBackgroundRunnerTest.java | 1 + .../indexing/overlord/TaskLifecycleTest.java | 2 +- .../worker/WorkerTaskManagerTest.java | 1 + .../worker/WorkerTaskMonitorTest.java | 1 + ...ntermediaryDataManagerAutoCleanupTest.java | 1 + ...iaryDataManagerManualAddAndDeleteTest.java | 1 + .../shuffle/ShuffleDataSegmentPusherTest.java | 1 + .../worker/shuffle/ShuffleResourceTest.java | 1 + .../realtime/appenderator/Appenderators.java | 5 +- .../appenderator/AppenderatorsManager.java | 3 +- .../DefaultOfflineAppenderatorFactory.java | 3 +- ...DummyForInjectionAppenderatorsManager.java | 3 +- .../PeonAppenderatorsManager.java | 6 +- .../UnifiedIndexerAppenderatorsManager.java | 6 +- ...nifiedIndexerAppenderatorsManagerTest.java | 3 +- 31 files changed, 148 insertions(+), 36 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index d49bf75675fa..6c9f3aba86d5 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1320,6 +1320,7 @@ Additional peon configs include: |`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote| |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`| |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`| +|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch.|`false`| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 128810463d91..34ef75cc87a3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2782,6 +2782,7 @@ private void makeToolboxFactory() throws IOException null, null, null, + false, false ); final TestDerbyConnector derbyConnector = derby.getConnector(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 6b497598534a..19d2445f7ee2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2868,6 +2868,7 @@ private void makeToolboxFactory() throws IOException null, null, null, + false, false ); final TestDerbyConnector derbyConnector = derby.getConnector(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index bf887e500e6e..2750f94f1497 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -70,6 +70,9 @@ public class TaskConfig @JsonProperty private final boolean ignoreTimestampSpecForDruidInputSource; + @JsonProperty + private final boolean batchMemoryMappedIndex; + @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @@ -81,7 +84,8 @@ public TaskConfig( @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, @JsonProperty("directoryLockTimeout") Period directoryLockTimeout, @JsonProperty("shuffleDataLocations") List shuffleDataLocations, - @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource + @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource, + @JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior ) { this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir; @@ -107,6 +111,7 @@ public TaskConfig( this.shuffleDataLocations = shuffleDataLocations; } this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource; + this.batchMemoryMappedIndex = batchMemoryMapIndex; } @JsonProperty @@ -189,6 +194,10 @@ public boolean isIgnoreTimestampSpecForDruidInputSource() return ignoreTimestampSpecForDruidInputSource; } + @JsonProperty + public boolean getBatchMemoryMappedIndex() { return batchMemoryMappedIndex; } + + private String defaultDir(@Nullable String configParameter, final String defaultVal) { if (configParameter == null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java index 711f9d478b95..bff138fb7172 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java @@ -80,7 +80,8 @@ public static Appenderator newAppenderator( toolbox.getIndexIO(), toolbox.getIndexMergerV9(), rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + toolbox.getConfig().getBatchMemoryMappedIndex() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java index f8a1bd047b25..e4417639f01f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java @@ -49,9 +49,11 @@ public class BatchAppenderatorTest extends InitializedNullHandlingTest ); @Test - public void testSimpleIngestion() throws Exception + public void testSimpleIngestionWithIndexesNotMapped() throws Exception { - try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, true)) { + try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, + false, + false)) { final Appenderator appenderator = tester.getAppenderator(); boolean thrown; @@ -124,6 +126,83 @@ public SegmentIdWithShardSpec apply(DataSegment input) } } + @Test + public void testSimpleIngestionWithIndexesMapped() throws Exception + { + try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, + false, + true)) { + final Appenderator appenderator = tester.getAppenderator(); + boolean thrown; + + // startJob + Assert.assertEquals(null, appenderator.startJob()); + + // getDataSource + Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource()); + + // add + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 2, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null) + .getNumRowsInSegment() + ); + + // getSegments + Assert.assertEquals(IDENTIFIERS.subList(0, 2), + appenderator.getSegments().stream().sorted().collect(Collectors.toList())); + + // getRowCount + Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0))); + Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1))); + thrown = false; + try { + appenderator.getRowCount(IDENTIFIERS.get(2)); + } + catch (IllegalStateException e) { + thrown = true; + } + Assert.assertTrue(thrown); + + // push all + final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push( + appenderator.getSegments(), + null, + false + ).get(); + Assert.assertEquals( + IDENTIFIERS.subList(0, 2), + Lists.transform( + segmentsAndCommitMetadata.getSegments(), + new Function() + { + @Override + public SegmentIdWithShardSpec apply(DataSegment input) + { + return SegmentIdWithShardSpec.fromDataSegment(input); + } + } + ).stream().sorted().collect(Collectors.toList()) + ); + Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()), + segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())); + + appenderator.clear(); + Assert.assertTrue(appenderator.getSegments().isEmpty()); + } + } private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum) { return new SegmentIdWithShardSpec( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java index 5b74b99c683a..4058aff938e5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java @@ -74,26 +74,21 @@ public class BatchAppenderatorTester implements AutoCloseable private final List pushedSegments = new CopyOnWriteArrayList<>(); - public BatchAppenderatorTester( - final int maxRowsInMemory - ) - { - this(maxRowsInMemory, -1, null, false); - } - public BatchAppenderatorTester( final int maxRowsInMemory, - final boolean enablePushFailure + final boolean enablePushFailure, + boolean batchMemoryMappedIndex ) { - this(maxRowsInMemory, -1, null, enablePushFailure); + this(maxRowsInMemory, -1, null, enablePushFailure, batchMemoryMappedIndex); } public BatchAppenderatorTester( final int maxRowsInMemory, final long maxSizeInBytes, final File basePersistDirectory, - final boolean enablePushFailure + final boolean enablePushFailure, + boolean batchMemoryMappedIndex ) { this( @@ -102,7 +97,8 @@ public BatchAppenderatorTester( basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters(), - false + false, + batchMemoryMappedIndex ); } @@ -112,7 +108,8 @@ public BatchAppenderatorTester( final File basePersistDirectory, final boolean enablePushFailure, final RowIngestionMeters rowIngestionMeters, - final boolean skipBytesInMemoryOverheadCheck + final boolean skipBytesInMemoryOverheadCheck, + boolean batchMemoryMappedIndex ) { objectMapper = new DefaultObjectMapper(); @@ -236,7 +233,8 @@ public Map makeLoadSpec(URI uri) indexIO, indexMerger, rowIngestionMeters, - new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0) + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + batchMemoryMappedIndex ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index a5e889d2726b..f1a72ddaa59c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -102,7 +102,7 @@ public void setUp() throws IOException EasyMock.replay(task, mockHandoffNotifierFactory); taskToolbox = new TaskToolboxFactory( - new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null, false), + new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null, false, false), new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), mockTaskActionClientFactory, mockEmitter, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 46fa4fac6a1c..113d41a864af 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1516,6 +1516,7 @@ public SegmentPublishResult announceHistoricalSegments( null, null, null, + false, false ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 03acabd620b9..a958ee66203f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -1298,7 +1298,7 @@ public List getLocations() ); return new TaskToolbox( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), null, createActionClient(task), null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c4faa5b2754e..330ccbf64945 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -1747,7 +1747,7 @@ private static class TestTaskToolbox extends TaskToolbox ) { super( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), null, taskActionClient, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java index caaeea253c73..97d6df15b328 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java @@ -117,6 +117,7 @@ public TaskStatus runTask(TaskToolbox toolbox) null, null, null, + false, false )).once(); EasyMock.replay(toolbox); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 6a6aae2b4d4f..e12f8762b87c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -312,7 +312,7 @@ public ListenableFuture run(Task task) ); final TaskToolbox box = new TaskToolbox( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), taskActionClient, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 571566223a0e..0a2e34d7d6d7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -898,6 +898,7 @@ private TaskToolbox makeToolbox( null, null, null, + false, false ); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index 2a342b7ccdd3..6e963e6f9136 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -105,7 +105,8 @@ public Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { return Appenderators.createOffline( @@ -118,7 +119,8 @@ public Appenderator createOfflineAppenderatorForTask( indexIO, indexMerger, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index ef7c668b2df1..f3a3c4f32f68 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -204,6 +204,7 @@ public void setUpAbstractParallelIndexSupervisorTaskTest() throws IOException null, null, ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)), + false, false ), null @@ -522,7 +523,7 @@ public Set getPublishedSegments(Task task) public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO) { - final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false); + final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false, false); objectMapper.setInjectableValues( new InjectableValues.Std() @@ -557,7 +558,7 @@ public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO) protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException { return new TaskToolbox( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), actionClient, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 120c7090d93d..c05cdb8a90b6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -90,6 +90,7 @@ public void setup() throws IOException null, null, null, + false, false ); final ServiceEmitter emitter = new NoopServiceEmitter(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index e3ba195fce1a..256e0aeff664 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -599,7 +599,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( new TaskAuditLogConfig(true) ); File tmpDir = temporaryFolder.newFolder(); - taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false); + taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false, false); return new TaskToolboxFactory( taskConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 1bb33f0b457c..2dd94e8c2875 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -89,6 +89,7 @@ private WorkerTaskManager createWorkerTaskManager() null, null, null, + false, false ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index 2a6e79316328..c1845097996d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -163,6 +163,7 @@ private WorkerTaskMonitor createTaskMonitor() null, null, null, + false, false ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java index 76f26fe92da6..3ccf8fda034e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java @@ -88,6 +88,7 @@ public Period getIntermediaryPartitionTimeout() null, null, ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java index 3c87bf4074d0..3c7d19fd9055 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -71,6 +71,7 @@ public void setup() throws IOException null, null, ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java index 54f6c0eef724..a7a76c8f4616 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java @@ -70,6 +70,7 @@ public void setup() throws IOException null, null, ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java index 741956a53f7b..78547b299e39 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java @@ -96,6 +96,7 @@ public Period getIntermediaryPartitionTimeout() null, null, ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index 00be53e8723f..ff8799daeba4 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -104,7 +104,8 @@ public static Appenderator createOffline( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { return new AppenderatorImpl( @@ -121,7 +122,7 @@ public static Appenderator createOffline( null, rowIngestionMeters, parseExceptionHandler, - false + batchMemoryMappedIndex // This is a task config (default false) to fallback to "old" code in case of bug with the new memory optimization code ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java index 76c64d26a410..c5b22cfb1b57 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java @@ -98,7 +98,8 @@ Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ); /** diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java index 7a0f1dc6fcd9..8abd0c297672 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java @@ -73,7 +73,8 @@ public Appenderator build(DataSchema schema, RealtimeTuningConfig config, FireDe false, config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE, 0 - ) + ), + false ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java index 87de2449a479..f1e2f3c91347 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java @@ -90,7 +90,8 @@ public Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { throw new UOE(ERROR_MSG); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index 7fa4f4c7c9f0..88a4f5720c18 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -122,7 +122,8 @@ public Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { // CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators @@ -139,7 +140,8 @@ public Appenderator createOfflineAppenderatorForTask( indexIO, indexMerger, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex ); return batchAppenderator; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 755319623121..3780c3735753 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -209,7 +209,8 @@ public Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { synchronized (this) { @@ -228,7 +229,8 @@ public Appenderator createOfflineAppenderatorForTask( indexIO, wrapIndexMerger(indexMerger), rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex ); datasourceBundle.addAppenderator(taskId, appenderator); return appenderator; diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java index f7c85b2ccd08..728e98026d83 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java @@ -84,7 +84,8 @@ public class UnifiedIndexerAppenderatorsManagerTest TestHelper.getTestIndexIO(), TestHelper.getTestIndexMergerV9(OnHeapMemorySegmentWriteOutMediumFactory.instance()), new NoopRowIngestionMeters(), - new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0) + new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0), + false ); @Test From 9fb20b6b625821816c019bb394694f7118a504e4 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Thu, 6 May 2021 16:12:41 -0700 Subject: [PATCH 19/21] Style fix. --- .../org/apache/druid/indexing/common/config/TaskConfig.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 2750f94f1497..2882489272f4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -195,7 +195,10 @@ public boolean isIgnoreTimestampSpecForDruidInputSource() } @JsonProperty - public boolean getBatchMemoryMappedIndex() { return batchMemoryMappedIndex; } + public boolean getBatchMemoryMappedIndex() + { + return batchMemoryMappedIndex; + } private String defaultDir(@Nullable String configParameter, final String defaultVal) From a302b887e0f624fa674ffbdc3efa38b96af00fd6 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Fri, 7 May 2021 17:20:54 -0700 Subject: [PATCH 20/21] Added note to new setting in doc, using Iterables.size (and removing a dependency), and fixing a typo in a comment. --- docs/configuration/index.md | 2 +- server/pom.xml | 5 ----- .../segment/realtime/appenderator/AppenderatorImpl.java | 4 ++-- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 6c9f3aba86d5..a4561b062427 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1320,7 +1320,7 @@ Additional peon configs include: |`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote| |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`| |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`| -|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch.|`false`| +|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used (to fall back in previous known to be working code path) when there is a bug in the new batch ingestion code that does not keep the index memory mapped.|`false`| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| diff --git a/server/pom.xml b/server/pom.xml index f19db30976cf..2c5ad5b16161 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -454,11 +454,6 @@ 1.3 test - - org.apache.commons - commons-collections4 - 4.2 - diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index a507f4d0b3f6..7e8a802e2c8a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -173,7 +173,7 @@ public class AppenderatorImpl implements Appenderator * in order to facilitate the mapping of the QueryableIndex associated with a given hydrant * at merge time. This is necessary since batch appenderator will not map the QueryableIndex * at persist time in order to minimize its memory footprint. This has to be synchronized since the - * map bay be accessed from multiple threads. + * map may be accessed from multiple threads. * Use {@link IdentityHashMap} to better reflect the fact that the key needs to be interpreted * with reference semantics. */ @@ -740,7 +740,7 @@ public ListenableFuture push( totalRows.addAndGet(-sink.getNumRows()); } // count hydrants for stats: - pushedHydrantsCount.addAndGet(IterableUtils.size(sink)); + pushedHydrantsCount.addAndGet(Iterables.size(sink)); } return Futures.transform( From 383196e30fb8085a5300f812d8c5e4056911a2be Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Mon, 10 May 2021 14:17:51 -0700 Subject: [PATCH 21/21] Forgot to commit this edited documentation message --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index a4561b062427..8d02aa8ed5ca 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1320,7 +1320,7 @@ Additional peon configs include: |`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote| |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`| |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`| -|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used (to fall back in previous known to be working code path) when there is a bug in the new batch ingestion code that does not keep the index memory mapped.|`false`| +|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|