'maxBytesInMemory' tuningConfig introduced for ingestion tasks#5583
'maxBytesInMemory' tuningConfig introduced for ingestion tasks#5583gianm merged 42 commits intoapache:masterfrom
Conversation
…for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist
jihoonson
left a comment
There was a problem hiding this comment.
@surekhasaharan thanks for the nice work! Please consider my comments. Also would you add a doc for the new cool configuration?
| KafkaTuningConfig that = (KafkaTuningConfig) o; | ||
| return maxRowsInMemory == that.maxRowsInMemory && | ||
| maxRowsPerSegment == that.maxRowsPerSegment && | ||
| maxBytesInMemory == this.maxBytesInMemory && |
There was a problem hiding this comment.
Should be that.maxBytesInMemory.
There was a problem hiding this comment.
@surekhasaharan Also, these methods can be generated automatically; try cmd-N in IntelliJ -> Generate -> equals/hashCode.
| return "KafkaTuningConfig{" + | ||
| "maxRowsInMemory=" + maxRowsInMemory + | ||
| ", maxRowsPerSegment=" + maxRowsPerSegment + | ||
| ",maxBytesInMemory=" + maxBytesInMemory + |
There was a problem hiding this comment.
nit: ", maxBytesInMemory=" as other variables.
There was a problem hiding this comment.
This one can also be generated automatically.
There was a problem hiding this comment.
hmm, for every code change, i did reformat code according to druid_intelliJ_formatting profile, but seems that did not correct the format.
There was a problem hiding this comment.
I was pointing out a different thing from the code formatting: the formatting is just about adjusting whitespace and such, not editing the actual logic of the toString method. Generating is about generating the actual logic: you can trigger that by deleting the toString method and then generating a new one (cmd-N => Generate)
There was a problem hiding this comment.
ah ok, will use that in future.
| { | ||
| final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( | ||
| 1000, | ||
| 1000, null, |
There was a problem hiding this comment.
nit: Druid format convention is like
1000,
null,
There was a problem hiding this comment.
again reformat does not seem to fix this in intelliJ, also should this not be flagged by 'mvn clean -Pstrict -pl '!benchmarks' compile test-compile -B' as a style error ? Anyways will fix.
There was a problem hiding this comment.
This is something the formatter probably should do, but it's not perfect. And also, we don't have a checkstyle rule for it. Ideally we should have such a rule - it'd be another good contribution!
There was a problem hiding this comment.
Unfortunately, our code format profile doesn't handle every code convention. I usually first do reformat code of Intellij, and check my code format is different from that of other codes.
There was a problem hiding this comment.
got it, will remember this and look more closely at the rest of code to get the convention followed.
| { | ||
| final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( | ||
| 1000, | ||
| 1000, null, |
| { | ||
| KafkaTuningConfig original = new KafkaTuningConfig( | ||
| 1, | ||
| 1, null, |
| { | ||
| //timestamp + dims length + dimensionDescsList shared pointer | ||
| long sizeInBytes = Long.BYTES + Integer.BYTES * dims.length + Long.BYTES + Long.BYTES; | ||
| sizeInBytes += dimsKeySize; |
There was a problem hiding this comment.
Would you elaborate more on how sizeInBytes is calculated?
There was a problem hiding this comment.
I attempted to add more docs, hope it will make it clear.
| (sum, aggregator) -> sum += aggregator.getMaxIntermediateSize(), | ||
| (sum1, sum2) -> sum1 + sum2 | ||
| ); | ||
| return maxAggregatorIntermediateSize; |
There was a problem hiding this comment.
Would you elaborate more on this? Looks like the actual size might be bigger than the calculated one.
There was a problem hiding this comment.
@jihoonson how could it be bigger, if the calculated size is the sum of max intermediate sizes?
(Putting aside the wrinkle that the max intermediate size is for the BufferAggregator, but IncrementalIndex actually uses the Aggregator. I think that should generally be OK since it would be strange for the Aggregator to take up a lot more space than the BufferAggregator.)
There was a problem hiding this comment.
IIRC, max intermediate size represents only the size of intermediate aggregate. However, it doesn't mean that an aggregator uses only that amount of memory. For example, LongSumAggregator keeps two variables like below.
private final BaseLongColumnValueSelector selector;
private long sum;There was a problem hiding this comment.
Ah I see. Yeah, it is definitely inexact in that regard. There are probably a few other overheads we're missing. I think it's safe to assume that every aggregator will have enough overhead for its own object header, and for a pointer to a selector. We could add a factor for that. If you believe this page, it's 128 bits per object: https://gist.github.com/arturmkrtchyan/43d6135e8a15798cc46c
It is a bit unsatisfying how inexact the memory usage approximations are, but I am hopeful they will be good enough to make the system run better out of the box, and that's what matters.
There was a problem hiding this comment.
So, should I add 16 bytes per aggregator ?
| outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); | ||
| } | ||
| if (!sizeCheck) { | ||
| outOfRowsReason = StringUtils.format("Maximum size in bytes [%d] reached", maxBytesInMemory); |
There was a problem hiding this comment.
This might replace outOfRowsReason. It should be checked that both countCheck and sizeCheck are false.
There was a problem hiding this comment.
Also, please add a null check for outOfRowsReason before allocating it and throw an exception if it's not null.
There was a problem hiding this comment.
How about reporting both reasons in the case where both checks trip at the same time?
There was a problem hiding this comment.
I think this is same with my first comment.
There was a problem hiding this comment.
yeah this was fuzzy and incomplete, fixed it.
| } | ||
|
|
||
| @VisibleForTesting | ||
| long getRowSizeInMemory(SegmentIdentifier identifier) |
There was a problem hiding this comment.
Would you rename this method to a more intuitive one? Looks like it returns a row size.
| .setIndexSchema(indexSchema) | ||
| .setReportParseExceptions(reportParseExceptions) | ||
| .setMaxRowCount(maxRowsInMemory) | ||
| .setMaxRowCount(maxRowsInMemory).setMaxBytesInMemory(maxBytesInMemory) |
There was a problem hiding this comment.
Please break the line before .setMaxBytesInMemory().
| KafkaTuningConfig that = (KafkaTuningConfig) o; | ||
| return maxRowsInMemory == that.maxRowsInMemory && | ||
| maxRowsPerSegment == that.maxRowsPerSegment && | ||
| maxBytesInMemory == this.maxBytesInMemory && |
There was a problem hiding this comment.
@surekhasaharan Also, these methods can be generated automatically; try cmd-N in IntelliJ -> Generate -> equals/hashCode.
| return "KafkaTuningConfig{" + | ||
| "maxRowsInMemory=" + maxRowsInMemory + | ||
| ", maxRowsPerSegment=" + maxRowsPerSegment + | ||
| ",maxBytesInMemory=" + maxBytesInMemory + |
There was a problem hiding this comment.
This one can also be generated automatically.
| @@ -39,6 +39,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera | |||
| { | |||
| private static final int defaultMaxRowsInMemory = 75000; | |||
There was a problem hiding this comment.
Hmm, maybe we should make the default max rows in memory higher now? I feel like we should, since that way with the default settings, maxBytesInMemory is going to be the one that takes effect.
How about 1000000?
There was a problem hiding this comment.
Yeah, we would need to change the default maxRows, just wasn't sure, if I should do this with current patch. Will change.
| * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that | ||
| * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. | ||
| * | ||
| * <p> |
There was a problem hiding this comment.
Please don't add these where they don't already exist. You can get IntelliJ to stop adding them by unchecking "Generate 'p' on empty lines" in the JavaDoc code style preferences.
There was a problem hiding this comment.
ok, changed the javadoc setting and removed this
| { | ||
| private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000; | ||
| private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; | ||
| private static final long DEFAULT_MAX_BYTES_IN_MEMORY = Runtime.getRuntime().maxMemory() / 3; |
There was a problem hiding this comment.
I wish we didn't have to have this in two places. Although I guess the default maxRowsInMemory is also in two places.
There was a problem hiding this comment.
Yeah, it is being defined in all the implementation of AppenderatorConfig. What do you think, if I define a static final in this interface itself and every implementation uses that. I just tried to follow same convention as for maxRowsInMemory. The upside will be one place to change default later, downside being every implementation is tied to same default value, well, which is same anyways even now. What do you suggest ?
There was a problem hiding this comment.
A recent patch created a IndexTaskUtils utility class. Maybe it would make sense to put these defaults there, like IndexTaskUtils.DEFAULT_MAX_ROWS_IN_MEMORY and IndexTaskUtils.DEFAULT_MAX_BYTES_IN_MEMORY.
There was a problem hiding this comment.
I have put these 2 defaults, but I there are others. Those can be moved to IndexTaskUtils in later commits.
| outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); | ||
| } | ||
| if (!sizeCheck) { | ||
| outOfRowsReason = StringUtils.format("Maximum size in bytes [%d] reached", maxBytesInMemory); |
There was a problem hiding this comment.
How about reporting both reasons in the case where both checks trip at the same time?
| if (!canAdd) { | ||
| final boolean countCheck = size() < maxRowCount; | ||
| boolean sizeCheck = true; | ||
| if (maxBytesInMemory != -1) { |
There was a problem hiding this comment.
Suggest doing > 0 rather than != -1. That way, any negative number (or zero) means unlimited.
| if (TimeAndDims.EMPTY_ROW_INDEX == prev) { | ||
| numEntries.incrementAndGet(); | ||
| if (maxBytesInMemory != -1) { | ||
| long estimatedRowSize = estimateRowSizeInBytes(key) + maxBytesPerRowForAggregators; |
There was a problem hiding this comment.
The method estimateRowSizeInBytes should really be named estimateKeySizeInBytes. Or, it should keep the name estimateRowSizeInBytes, but in that case it should do the + maxBytesPerRowForAggregators part. The point being that the row includes both the timeAndDims key, and the aggregators.
There was a problem hiding this comment.
Thinking about it a bit more, I think it makes sense to keep the method named estimateRowSizeInBytes and to add the + maxBytesPerRowForAggregators into the method. That way, there's a clear entry point for the logic for estimating a row size, and people can look there if they want to improve it later.
There was a problem hiding this comment.
changed to second comment
| } | ||
|
|
||
| @Test | ||
| public void testMaxBytesInMemory() throws Exception |
There was a problem hiding this comment.
This test is good, but please add another one verifying that the limit is applied across more than one sink in the same appenderator.
| } | ||
|
|
||
| @VisibleForTesting | ||
| long getRowSizeInMemory(SegmentIdentifier identifier) |
|
Thanks @surekhasaharan! I left some review comments. One of them suggested changing the default maxRowsInMemory to 1000000 (the idea being we'll rely a lot more on maxBytesInMemory) although we can also discuss whether or not that is a good idea. There are some pros and cons. I guess the biggest con would be that there is potential for a well-tuned setup with the current scheme to be thrown out of whack somehow. But I think it's worth it if these defaults are better for the majority of cases. Marked this "release notes" due to the potential change in behavior. |
…ce (apache#5579) * Add overlord unsecured paths to coordinator when using combined service * PR comment
* Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments
* Allow getDomain to return disjointed intervals * Indentation issues
apache#5551) * Adding feature thetaSketchConstant to do some set operation in PostAggregator * Updated review comments for PR apache#5551 - Adding thetaSketchConstant * Fixed CI build issue * Updated review comments 2 for PR apache#5551 - Adding thetaSketchConstant
* With incremental handoff the changed line is no longer true.
* Add missing doc for automatic pendingSegments * address comments
* Fix indexTask to respect forceExtendableShardSpecs * add comments
Deprecated due to apache#5382
…#5586) Also switch various firehoses to the new method. Fixes apache#5585.
…for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist
* Fix the coding style according to druid conventions * Add more javadocs * Rename some variables/methods * Other minor issues
* Some refactoring to put defaults in IndexTaskUtils * Added check for maxBytesInMemory in AppenderatorImpl * Decrement bytes in abandonSegment * Test unit test for multiple sinks in single appenderator * Fix some merge conflicts after rebase
Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex
gianm
left a comment
There was a problem hiding this comment.
Thanks for the updates @surekhasaharan!
I wrote some more comments. In addition to the comments on the diff, a couple of general ones:
- Please add doc entries for the new config: a good way to find where to add this is to search the docs for "maxRowsInMemory".
- Please update our tutorials and sample ingestion specs to not have a maxRowsInMemory set. I think some of them have one set explicitly, and that will mess with the attempt here to determine it automatically.
- Please also add some maxBytesInMemory handling to Hadoop indexing, by way of HadoopTuningConfig + IndexGeneratorJob.
|
|
||
| public class IndexTaskUtils | ||
| { | ||
| public static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000; |
There was a problem hiding this comment.
Now that I think about it some more, this class isn't going to work for these constants.
I'm hoping we can just write them in one place and have other stuff reference them. They are both magical and it's good for magic to be rare. But, one of the things (for example) that has to reference them is HadoopTuningConfig, which is in the hadoop-indexer package.
Maybe the right place to put these is in TuningConfig itself (i.e. io.druid.segment.indexing.TuningConfig).
There was a problem hiding this comment.
ok, yeah, when i added the defaults to IndexTaskUtils it didn't cover all the places I had defined this default. Defining those in TuningConfig seems correct. But I am also checking for maxBytesInMemory == 0 in OnheapIncrementalIndex and setting it to default if 0, and now that seems wrong. May be I should instead throw an exception in IncrementalIndex.buildOnHeap. Perhaps now I understand what @jihoonson was talking about.
| public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig | ||
| { | ||
| private static final int defaultMaxRowsInMemory = 75000; | ||
| private static final int defaultMaxRowsInMemory = 1000000; |
There was a problem hiding this comment.
This should use IndexTaskUtils.DEFAULT_MAX_ROWS_IN_MEMORY too.
There was a problem hiding this comment.
will change to TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY
| private static final int defaultMaxRowsInMemory = 75000; | ||
| private static final int defaultMaxRowsInMemory = 1000000; | ||
| private static final int defaultMaxRowsPerSegment = 5_000_000; | ||
| private static final long defaultMaxBytesInMemory = getDefaultMaxBytesInMemory(); |
There was a problem hiding this comment.
This should use IndexTaskUtils.DEFAULT_MAX_BYTES_IN_MEMORY too.
There was a problem hiding this comment.
will change to TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY now.
| @Override | ||
| public long estimateEncodedKeyComponentSize(int[] key) | ||
| { | ||
| /** |
There was a problem hiding this comment.
The javadoc-style /** comment isn't appropriate here (those are meant for documenting classes, methods, and fields). It could just be a // style comment since it's one line. And usually we use that style within methods even for multiline comments (although sometimes we use /*).
| this.parseExceptionMessages = parseExceptionMessages; | ||
| } | ||
|
|
||
| public AddToFactsResult( |
There was a problem hiding this comment.
I'd suggest getting rid of the other constructor. Having too many legacy constructors makes it easy to accidentally call the wrong constructor and leave out an important parameter.
IMO - in this case, if addToFacts isn't able to generate a bytesInMemory number, it would be better to fess up and explicitly pass in a zero so it's obvious to readers what is going on.
There was a problem hiding this comment.
Removed the other constructor and passed 0 for bytesInMemory from OffheapIncrementalIndex.
| // This variable updated in add(), persist(), and drop() | ||
| private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger(); | ||
| private final AtomicInteger totalRows = new AtomicInteger(); | ||
| private final AtomicLong currentBytesInMemory = new AtomicLong(); |
There was a problem hiding this comment.
Would prefer to see this named bytesCurrentlyInMemory for a clear analogy with rowsCurrentlyInMemory.
There was a problem hiding this comment.
ok, that name is more suitable here.
| private final List<DimensionDesc> dimensionDescsList; | ||
| private final Map<String, ColumnCapabilitiesImpl> columnCapabilities; | ||
| private final AtomicInteger numEntries = new AtomicInteger(); | ||
| private final AtomicLong sizeInBytes = new AtomicLong(); |
There was a problem hiding this comment.
Consider calling this bytesInMemory for consistency with the config name. So it's clear that they're related.
| || (tuningConfig.getMaxBytesInMemory() > 0 && currentBytesInMemory.get() >= tuningConfig.getMaxBytesInMemory())) { | ||
| if (allowIncrementalPersists) { | ||
| // persistAll clears rowsCurrentlyInMemory, no need to update it. | ||
| persistAll(committerSupplier == null ? null : committerSupplier.get()); |
There was a problem hiding this comment.
Please log the reason we're persisting here. There are starting to be enough conditions that it's going to be useful to see the specific one that got triggered.
There was a problem hiding this comment.
Refactored this part a bit, please check the format of the log, if it looks okay.
There was a problem hiding this comment.
It looks good, although in the nittiest of nit picks, ", " would be nicer than ",".
| } | ||
| } | ||
|
|
||
| //add methods for byte mem checks |
There was a problem hiding this comment.
This looks like a stray comment that you meant to delete?
| return numEntries.get(); | ||
| } | ||
|
|
||
| public long sizeInBytes() |
There was a problem hiding this comment.
Consider naming this getBytesInMemory() so it's clear it's related to all the other bytes-in-memory stuff.
* Put defaults for maxRows and maxBytes in TuningConfig * Change/add javadocs * Refactoring and renaming some variables/methods
* Added maxBytesInMemory config in docs * Removed references to maxRowsInMemory under tuningConfig in examples
|
@surekhasaharan Could you please resolve the conflicts, and we can do another round of review? thanks! |
| int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000; | ||
| // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only | ||
| // tracks active index and not the index being flushed to disk, to account for that | ||
| // we doubled default to 1/6(max jvm memory) |
There was a problem hiding this comment.
Nit: it's halved, not doubled.
| |`type`|String|The indexing task type, this should always be `kafka`.|yes| | ||
| |`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)| | ||
| |`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 1000000)| | ||
| |`maxBytesInMemory`|Long|The maximum number of bytes to keep in memory to aggregate before persisting. This is used to manage the required JVM heap size. |no (default == One-sixth of max JVM memory)| |
There was a problem hiding this comment.
This should include a comment like maxRowsInMemory's that tells people that the actual max is going to be double (or more, if you set maxPendingPersists higher). It would also be nice to warn people that this is approximate. Maybe something like:
The number of bytes to aggregate in-heap before persisting. This is based on a rough estimate of memory usage, not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists).
There was a problem hiding this comment.
Similar comments at other locations where this parameter is documented.
gianm
left a comment
There was a problem hiding this comment.
LGTM 👍 thanks @surekhasaharan!
This is tagged design review so someone else should take a look too.
|
I'll finish my review tomorrow. |
| |type|The task type, this should always be "index".|none|yes| | ||
| |targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no| | ||
| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no| | ||
| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|1000000|no| |
There was a problem hiding this comment.
Looks like some docs on maxRowsInMemory have gone stale. Can we sync all of them?
There was a problem hiding this comment.
I did not understand this, what do you mean by "sync all of them".
There was a problem hiding this comment.
I meant, we might make all documents for maxRowsInMemory of all tuningConfigs same.
| |targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no| | ||
| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no| | ||
| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|1000000|no| | ||
| |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| |
There was a problem hiding this comment.
I think some people might be confused by two similar but different configurations. I think it's worthwhile to roughly describe their proper usages.
There was a problem hiding this comment.
Agree, it can be confusing, I'll try to add more explanation.
| private static final int defaultMaxRowsInMemory = 75000; | ||
| private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; | ||
| private static final int defaultMaxRowsPerSegment = 5_000_000; | ||
| private static final long defaultMaxBytesInMemory = TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY; |
| { | ||
| return new RealtimeTuningConfig( | ||
| defaultMaxRowsInMemory, | ||
| defaultMaxBytesInMemory, |
There was a problem hiding this comment.
Is this intentional? Probably it should be null or 0 according to the comment on TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY.
| DEFAULT_SHARD_SPECS, | ||
| DEFAULT_INDEX_SPEC, | ||
| DEFAULT_ROW_FLUSH_BOUNDARY, | ||
| DEFAULT_MAX_BYTES_IN_MEMORY, |
There was a problem hiding this comment.
Same here. Is this intentional? Probably it should be null or 0 according to the comment on TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY.
| /** | ||
| * Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only | ||
| * be run in the single-threaded pushExecutor. | ||
| <<<<<<< HEAD |
| public class Sink implements Iterable<FireHydrant> | ||
| { | ||
| private static final IncrementalIndexAddResult ADD_FAILED = new IncrementalIndexAddResult(-1, null); | ||
|
|
| } | ||
| } | ||
|
|
||
|
|
| tuningConfig.getMaxRowsInMemory() | ||
| )); | ||
| } | ||
| if (tuningConfig.getMaxBytesInMemory() != -1 |
There was a problem hiding this comment.
Better to be tuningConfig.getMaxBytesInMemory() > 0
| // This variable updated in add(), persist(), and drop() | ||
| private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger(); | ||
| private final AtomicInteger totalRows = new AtomicInteger(); | ||
| private final AtomicLong bytesCurrentlyInMemory = new AtomicLong(); |
There was a problem hiding this comment.
rowsCurrentlyInMemory, bytesCurrentlyInMemory, and totalRows should be updated in sync because different threads can read/update them and independent updates might lead an unexpected behavior. For example, rowsCurrentlyInMemory might be already updated, but bytesCurrentlyInMemory is not when another thread reads them.
This should be done when I added totalRows, but I missed it. Would you add a class to update all these stats atomically? Like
class MemoryStats
{
Object lock;
int rowsInMemory;
long bytesInMemory;
int rowsInMemoryAndDisk;
void add(int rowsInMemory, long bytesInMemory, int rowsInMemoryAndDisk)
{
synchronized(lock) {
this.rowsInMemory += rowsInMemory;
this.bytesInMemory += bytesInMemory;
this.rowsInMemoryAndDisk += rowsInMemoryAndDisk;
}
}
}There was a problem hiding this comment.
There's no point updating these in sync if they are not checked in sync, so for this change to be useful, the check should be moved into MemoryStats as well. But is it really necessary to check them in sync? The non-add threads can only decrease these values. I don't think anything bad will happen if the values decrease while add is in the middle of checking them.
What do you think?
There was a problem hiding this comment.
Hmm, thinking about it some more, if drop is called concurrently with add for the same segment then there might be some problems. However, this never happens in practice, since appenderator drivers only call drop after pushing/handing off segments that they are done writing to. Maybe we can just firm up the contract by adding to the javadoc for drop that callers must not call drop concurrently with add for the same segment. I think there isn't much reason to write the code for this possibility, since it isn't meant to actually happen.
What do you think, once again?
There was a problem hiding this comment.
Ah, yes. The checking part is missing in the above snippet and it should also be in there.
And yes. It doesn't cause anything bad, but synchronized updates will lead a better behavior like less persisting. Do you have any other concerns?
There was a problem hiding this comment.
I want to sort out whether we would be making this change for correctness reasons or aesthetic reasons. If it's for aesthetic reasons, maybe it's not necessary. And if it's for correctness reasons, then we need to make sure we are synchronizing the appropriate amount of things to fix the bug we are trying to fix.
I guess what I'm saying is that it doesn't look to me like "synchronized updates will lead a better behavior like less persisting" is true in practice, since the updates won't be happening simultaneously if the appenderator is being used properly.
It looks to me like this synchronization only useful if drop and add are called simultaneously [1]. But this shouldn't happen anyway. If it does happen, it will cause worse problems (like data loss, since drop is meant to be used after segments have been pushed/handed off. If they haven't been handed off yet then we will lose data).
[1] The counters are updated in threads that call add, persist, drop, and clear. But add, persist, and clear already have javadoc saying they must be called from the same thread, so there is no synchronization issue. drop is the only one that is allowed to be called from a different thread than the other three.
There was a problem hiding this comment.
Ok, I got your point. Thanks.
There was a problem hiding this comment.
@surekhasaharan I think probably it would be ok to leave these alone until we figure out what the right approach is here, it might be a separate PR to either clean up the contract or synchronize some more stuff.
@jihoonson let us know what you think too.
There was a problem hiding this comment.
@surekhasaharan I talked with @gianm offline. Please see the new comment. If this is the case, rowsCurrentlyInMemory and bytesCurrentlyInMemory have no longer to be AtomicInteger because they are used by the same thread.
There was a problem hiding this comment.
If we end up doing the atomic-to-nonatomic change, I think it makes sense to do it in a separate PR. I think we should at least address the KafkaIndexTask publishing being in a separate thread first (#5729). And get rid of persist too.
So my vote is to not worry about it for this particular patch.
There was a problem hiding this comment.
Agree. @surekhasaharan if you don't want to do in this PR, please raise an issue about it.
| // Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks). | ||
| rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()); | ||
| totalRows.addAndGet(-sink.getNumRows()); | ||
| bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory()); |
There was a problem hiding this comment.
This and rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()); are not necessary because sink should be pushed before calling this method and thus there should be no data in memory. Instead, we need to add a sanity check that sink.getNumRowsInMemory() == 0. Same for getBytesInMemory().
There was a problem hiding this comment.
Not clear, where do we want to add this sanity check ?
There was a problem hiding this comment.
We can add a sanity check instead of rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()) because sink.getNumRowsInMemory() should always return 0 here. But, as commented #5583 (comment), you don't have to do this in this PR.
| { | ||
| private static final int defaultMaxRowsInMemory = 75000; | ||
| private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; | ||
| private static final long defaultMaxBytesInMemory = TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY; |
There was a problem hiding this comment.
This variable is not used anymore.
| private static final Logger log = new Logger(OnheapIncrementalIndex.class); | ||
|
|
||
| /** | ||
| * overhead per {@link ConcurrentHashMap.Node} object |
There was a problem hiding this comment.
This should be Overhead per {@link ConcurrentHashMap.Node} or {@link ConcurrentSkipListMap.Node} object because the facts can be either one of them. Related codes:
| private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>(); | ||
| private final FactsHolder facts; | ||
| private final AtomicInteger indexIncrement = new AtomicInteger(0); | ||
| private long maxBytesPerRowForAggregators = 0; |
There was a problem hiding this comment.
The initialization to 0 is unnecessary and this can be final.
| |type|The task type, this should always be "index".|none|yes| | ||
| |targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no| | ||
| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no| | ||
| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no| |
There was a problem hiding this comment.
Please add this and the below description on maxBytesInMemory to all other places as well.
jihoonson
left a comment
There was a problem hiding this comment.
@surekhasaharan thanks! Looks good to me now.
jon-wei
left a comment
There was a problem hiding this comment.
Design LGTM, had a minor comment
|
|
||
| this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; | ||
| this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment; | ||
| // initializing this to 0, it will be lazily intialized to a value |
There was a problem hiding this comment.
ah, it's at more than one place. Will fix.
…e#5583) * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Fix check style and remove a comment * Add overlord unsecured paths to coordinator when using combined service (apache#5579) * Add overlord unsecured paths to coordinator when using combined service * PR comment * More error reporting and stats for ingestion tasks (apache#5418) * Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments * Allow getDomain to return disjointed intervals (apache#5570) * Allow getDomain to return disjointed intervals * Indentation issues * Adding feature thetaSketchConstant to do some set operation in PostAgg (apache#5551) * Adding feature thetaSketchConstant to do some set operation in PostAggregator * Updated review comments for PR apache#5551 - Adding thetaSketchConstant * Fixed CI build issue * Updated review comments 2 for PR apache#5551 - Adding thetaSketchConstant * Fix taskDuration docs for KafkaIndexingService (apache#5572) * With incremental handoff the changed line is no longer true. * Add doc for automatic pendingSegments (apache#5565) * Add missing doc for automatic pendingSegments * address comments * Fix indexTask to respect forceExtendableShardSpecs (apache#5509) * Fix indexTask to respect forceExtendableShardSpecs * add comments * Deprecate spark2 profile in pom.xml (apache#5581) Deprecated due to apache#5382 * CompressionUtils: Add support for decompressing xz, bz2, zip. (apache#5586) Also switch various firehoses to the new method. Fixes apache#5585. * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Address code review comments * Fix the coding style according to druid conventions * Add more javadocs * Rename some variables/methods * Other minor issues * Address more code review comments * Some refactoring to put defaults in IndexTaskUtils * Added check for maxBytesInMemory in AppenderatorImpl * Decrement bytes in abandonSegment * Test unit test for multiple sinks in single appenderator * Fix some merge conflicts after rebase * Fix some style checks * Merge conflicts * Fix failing tests Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex * Address PR comments * Put defaults for maxRows and maxBytes in TuningConfig * Change/add javadocs * Refactoring and renaming some variables/methods * Fix TeamCity inspection warnings * Added maxBytesInMemory config to HadoopTuningConfig * Updated the docs and examples * Added maxBytesInMemory config in docs * Removed references to maxRowsInMemory under tuningConfig in examples * Set maxBytesInMemory to 0 until used Set the maxBytesInMemory to 0 if user does not set it as part of tuningConfing and set to part of max jvm memory when ingestion task starts * Update toString in KafkaSupervisorTuningConfig * Use correct maxBytesInMemory value in AppenderatorImpl * Update DEFAULT_MAX_BYTES_IN_MEMORY to 1/6 max jvm memory Experimenting with various defaults, 1/3 jvm memory causes OOM * Update docs to correct maxBytesInMemory default value * Minor to rename and add comment * Add more details in docs * Address new PR comments * Address PR comments * Fix spelling typo
why it's 1/6 of Runtime.maxMemory |
|
|
@hellobabygogo there is a comment in the code about this here. Some more details, while analyzing the heap dump on OOMEs, we found that two |
Currently a config called 'maxRowsInMemory' is present which affects how much memory gets
used for indexing. If this value is not optimal for your JVM heap size, it could lead
to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might
be bad for query performance and a higher value will limit number of persists but require
more jvm heap space and could lead to OOM.
'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes
kept in memory before persisting.
will be respected i.e. the first one to go above threshold will trigger persist