Make persists concurrent with adding rows in batch ingestion#11536
Make persists concurrent with adding rows in batch ingestion#11536clintropolis merged 21 commits intoapache:masterfrom
Conversation
… in the backround as well
|
I replaced the semaphore using the style of concurrency used in the appenderator. Now persists and push are ran in the background. |
| maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault(); | ||
| skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck(); | ||
| if (tuningConfig.getMaxPendingPersists() < 1) { | ||
| maxPendingPersists = DEFAULT_PENDING_PERSISTS; |
There was a problem hiding this comment.
What is the rationale for the default of 2? The previous default was 0 which is infinite. I don't think we ever need to change this in production. The doc for maxPendingPersists was not updated in #11294, so whatever we change here, we should fix the doc too.
There was a problem hiding this comment.
I see it is already documented in external docs that its default is zero. Good catch thanks!
|
|
||
| private void initializeExecutors() | ||
| { | ||
| log.info("There will be up to[%d] pending persists", maxPendingPersists); |
There was a problem hiding this comment.
How could this be useful except for debugging? If this is only useful for debugging, it should be not info.
There was a problem hiding this comment.
Changed to debug in next commit
| * are being completely removed from memory after each incremental persist. | ||
| */ | ||
| private final Map<SegmentIdWithShardSpec, SinkMetadata> sinksMetadata = new HashMap<>(); | ||
| private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata> sinksMetadata = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Please document details of the concurrent access pattern.
There was a problem hiding this comment.
Added comments about the usage for this in the javadoc (next commit)
|
|
||
| // This variable updated in add(), persist(), and drop() | ||
| private int rowsCurrentlyInMemory = 0; | ||
| private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger(); |
There was a problem hiding this comment.
What threads can access this and bytesCurrentlyInMemory concurrently? If there are any, please document details of the concurrent access pattern.
There was a problem hiding this comment.
Removed unnecessary atomics in next committ.
| private volatile Throwable persistError; | ||
|
|
||
|
|
||
| private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Can multiple threads access this map at the same time? I don't see any unless I'm missing something. If there are any, please document details of the concurrent access pattern. It helps people a lot including reviewers, other developers, and your future-self to understand and remember how things work. Also please check out https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md. We have reasons for including it in the PR template. I would highly suggest reading it and marking the concurrency self-review item in your PR checklist.
There was a problem hiding this comment.
No concurrency control required for this map, removed in next commit.
|
|
||
| appenderator.persistAll(null).get(); | ||
| appenderator.persistAll(null); | ||
| waitForPersists(); |
There was a problem hiding this comment.
Why not persistAll(null).get()?
There was a problem hiding this comment.
Typo --- fixed in next commit.
|
|
||
| private void waitForPersists() throws InterruptedException | ||
| { | ||
| Thread.sleep(500); |
There was a problem hiding this comment.
Sleeps are bad. These will make the unit testing slower. Also, I bet all the sleeps you added will make these tests quite flaky.
There was a problem hiding this comment.
The use case here is somehow wait for a concurrent task to check on its effect, it seems like a legitimate use for sleep. Let me know what you are thinking that could be better.
There was a problem hiding this comment.
oh I see your suggestion below...
|
|
||
| appenderator.startJob(); | ||
| appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null); | ||
| waitForPersists(); |
There was a problem hiding this comment.
I think you could keep the future of persist triggered in add() in a variable. Then you can add a method used only for testing that returns the persist future. Then you can finally wait for the future to be done instead of sleeping.
There was a problem hiding this comment.
I was trying to avoid more changes to the class (exposing methods etc) but I can pursue your suggestion
|
BTW, I assumed that the core logic is not changed and reviewed only the concurrency part. Let me know if this is not true. |
|
You are correct, @jihoonson, core logic is not changed. |
|
Changes I added in the latest commit (507c2f6):
|
| DatasourceBundle::new | ||
| ); | ||
|
|
||
| Appenderator appenderator = Appenderators.createLegacyOffline( |
There was a problem hiding this comment.
This is wrong, it should be createClosedSegmentsOffline, I will fix it
|
This pull request introduces 1 alert when merging 37e0856 into 59d2578 - view on LGTM.com new alerts:
|
|
This pull request introduces 1 alert when merging 291420e into 60efbb5 - view on LGTM.com new alerts:
|
|
This pull request introduces 1 alert when merging 1b1fd7b into 60efbb5 - view on LGTM.com new alerts:
|
| "org.apache.hadoop:hadoop-client:2.8.5" | ||
| ); | ||
|
|
||
| public enum BatchProcesingMode |
There was a problem hiding this comment.
BatchProcesingMode -> BatchProcessingMode
| |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`| | ||
| |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`| | ||
| |`druid.indexer.task.useLegacyBatchProcessing`|If false, native batch ingestion will use a new, recommended, code path with memory optimized code for the segment creation phase. If true it will use the previous code path for the create segments phase of batch ingestion. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new optimized batch ingestion code. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`| | ||
| |`druid.indexer.task.batchMemoryMappedIndex`|DEPRECATED: Use `druid.indexer.task.batchProcessingMode` instead. If false, native batch ingestion will use a new, recommended, code path with memory optimized code for the segment creation phase. If true it will use the previous code path for the create segments phase of batch ingestion. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new optimized batch ingestion code. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`| |
There was a problem hiding this comment.
Maybe call out that set batchMemoryMappedIndex to true will set batchProcessingMode to LEGACY and overwrite the batchProcessingMode value.
| } else { | ||
| // batchProcessingMode input string is invalid, just use the default...log message somewhere??? | ||
| this.batchProcessingMode = BatchProcesingMode.CLOSED_SEGMENTS; // Default | ||
| } |
There was a problem hiding this comment.
Can we log the batchProcessingMode value after this if block?
| final TaskToolbox box = new TaskToolbox( | ||
| new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), | ||
| new TaskConfig(null, null, null, null, null, false, null, null, null, false, false, | ||
| TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS.name()), |
There was a problem hiding this comment.
nit: should this use default instead of this explicit value?
|
|
||
| /** | ||
| * This class is to support LEGACY and CLOSED_SEGMENTS appenderators. It is copied as-is | ||
| * from 0.21 and it is meant to keep for backward compatibility. For now though this class |
There was a problem hiding this comment.
nit: this isn't actually a copy of 0.21... its of some intermediary state between 0.21 and 0.22. I think StreamAppenderator is technically a copy of ApenderatorImpl in 0.21.
| import java.util.Map; | ||
| import java.util.concurrent.CopyOnWriteArrayList; | ||
|
|
||
| public class LegacyAndClosedSegmentsAppenderatorTester implements AutoCloseable |
There was a problem hiding this comment.
super-nit: OpenAndClosed... instead of LegacyAndClosed
| import java.util.function.Function; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class LegacyAndClosedSegmentsBatchAppenderatorDriverTest extends EasyMockSupport |
There was a problem hiding this comment.
same super nit replacing Legacy with Open
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class LegacyAndClosedSegmentsBatchAppenderatorTest extends InitializedNullHandlingTest |
|
This pull request introduces 1 alert when merging c85a201 into dcee99d - view on LGTM.com new alerts:
|
|
This pull request introduces 1 alert when merging 86973a1 into dcee99d - view on LGTM.com new alerts:
|
|
This pull request introduces 1 alert when merging bd83203 into dcee99d - view on LGTM.com new alerts:
|
…acoco still refuses to ack coverage
|
This pull request introduces 1 alert when merging 7f004f1 into dcee99d - view on LGTM.com new alerts:
|
…hod that was still using "legacy" rather than "openSegments"
|
This pull request introduces 1 alert when merging d97615f into dcee99d - view on LGTM.com new alerts:
|
…11536) * Make persists concurrent with ingestion * Remove semaphore but keep concurrent persists (with add) and add push in the backround as well * Go back to documented default persists (zero) * Move to debug * Remove unnecessary Atomics * Comments on synchronization (or not) for sinks & sinkMetadata * Some cleanup for unit tests but they still need further work * Shutdown & wait for persists and push on close * Provide support for three existing batch appenderators using batchProcessingMode flag * Fix reference to wrong appenderator * Fix doc typos * Add BatchAppenderators class test coverage * Add log message to batchProcessingMode final value, fix typo in enum name * Another typo and minor fix to log message * LEGACY->OPEN_SEGMENTS, Edit docs * Minor update legacy->open segments log message * More code comments, mostly small adjustments to naming etc * fix spelling * Exclude BtachAppenderators from Jacoco since it is fully tested but Jacoco still refuses to ack coverage * Coverage for Appenderators & BatchAppenderators, name change of a method that was still using "legacy" rather than "openSegments" Co-authored-by: Clint Wylie <cjwylie@gmail.com>
…#11679) * Make persists concurrent with ingestion * Remove semaphore but keep concurrent persists (with add) and add push in the backround as well * Go back to documented default persists (zero) * Move to debug * Remove unnecessary Atomics * Comments on synchronization (or not) for sinks & sinkMetadata * Some cleanup for unit tests but they still need further work * Shutdown & wait for persists and push on close * Provide support for three existing batch appenderators using batchProcessingMode flag * Fix reference to wrong appenderator * Fix doc typos * Add BatchAppenderators class test coverage * Add log message to batchProcessingMode final value, fix typo in enum name * Another typo and minor fix to log message * LEGACY->OPEN_SEGMENTS, Edit docs * Minor update legacy->open segments log message * More code comments, mostly small adjustments to naming etc * fix spelling * Exclude BtachAppenderators from Jacoco since it is fully tested but Jacoco still refuses to ack coverage * Coverage for Appenderators & BatchAppenderators, name change of a method that was still using "legacy" rather than "openSegments" Co-authored-by: Clint Wylie <cjwylie@gmail.com> Co-authored-by: Agustin Gonzalez <agustin.gonzalez@imply.io>
Recent testing have shown a potential performance degradation in the bounded memory work for segment creation of batch ingestion. The reason is most probably the fact that intermediate persists are blocking, serial, with ingestion. We will add back concurrent persists and test performance to validate that previous performance is back.
Changes:
Intermediate persists are made concurrent with adding rows in the batch appenderator.
We are also interested in supporting three main modes for the batch appenderator, this PR is implementing also the following requirements:
This PR has: