Kinesis adaptive memory management#15360
Conversation
…nd web-console * fix fetchThreads calculation
…rOfferTimeout * check return value of newQ::offer and fail if false
| For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation). | ||
| - `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation). | ||
| - `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller. | ||
| - `maxRecordsPerPoll`: 1. |
There was a problem hiding this comment.
Should this be higher? I wonder if this is too low in the case of non-aggregated records
There was a problem hiding this comment.
I wondered the same actually. tbh, im not sure. I think validation for this requires extensive performance testing.
There was a problem hiding this comment.
Changed it so that it polls for at least one record and at most 1_000_000 bytes if more than 1 record, which is what we were targeting for before.
There was a problem hiding this comment.
So does that mean we should update the maxRecordsPerPoll: 1 here?
| ); | ||
| int maxFetchThreads = Math.max( | ||
| 1, | ||
| (int) (memoryToUse / 10_000_000L) |
There was a problem hiding this comment.
nit: maybe use a constant for the 10MB limit with a comment that explains the limit comes from the Kinesis library
| Boolean resetOffsetAutomatically, | ||
| Boolean skipSequenceNumberAvailabilityCheck, | ||
| Integer recordBufferSize, | ||
| @Nullable Integer recordBufferSizeBytes, |
There was a problem hiding this comment.
Do you think it'd make sense to log a warning if the eliminated property is provided?
There was a problem hiding this comment.
good thought, will add.
|
|
||
| scheduleBackgroundFetch(recordBufferFullWait); | ||
| return; | ||
| recordBufferOfferWaitMillis = recordBufferFullWait; |
There was a problem hiding this comment.
How come the shardIterator doesn't need to be reset here as before?
There was a problem hiding this comment.
Previously when the record buffer is full here, the fetchRecords logic threw away the rest of the GetRecords result after recordBufferOfferTimeout and starts a new shard iterator. This seemed excessively churny. Instead we wait an unbounded amount of time for queue to stop being full. If the queue remains full, we’ll end up right back waiting for it after the restarted fetch.
| Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord"); | ||
| MethodHandles.Lookup lookup = MethodHandles.publicLookup(); | ||
| try { | ||
| Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord"); |
There was a problem hiding this comment.
Are the points about the licensing above still correct? Looks like amazon-kinesis-client is Apache licensed now: https://github.com/awslabs/amazon-kinesis-client/blob/master/LICENSE.txt
There was a problem hiding this comment.
Removed the licensing comments.
There was a problem hiding this comment.
Oh, it even looks like since #12370, amazon-kinesis-client with an Apache license is a regular dependency. So this reflective stuff is no longer needed. Please either rewrite it to use regular Java calls, or if you don't rewrite it, include a comment describing the situation. Something like:
The deaggregate function is implemented by the amazon-kinesis-client, whose license was formerly not compatible with Apache. The code here avoids the license issue by using reflection, but is no longer necessary since amazon-kinesis-client is now Apache-licensed and is now a dependency of Druid. This code could safely be modified to use regular calls rather than reflection.
| .forEachOrdered(newQ::offer); | ||
| .filter(x -> !partitions.contains(x.getData().getStreamPartition())) | ||
| .forEachOrdered(x -> { | ||
| if (!newQ.offer(x)) { |
There was a problem hiding this comment.
Is this a new failure mode? What would've happened in the old code if the queue size was exceeded?
There was a problem hiding this comment.
it is a new failure more. I believe if the data was not added here, it could have resulted in data loss. Any other suggestion here? I was a little concerned about this too, but I think potential data loss is worse.
There was a problem hiding this comment.
added comment saying that this shouldnt really happen, but is added for safety.
|
Tagged "release notes" since various memory-related configs are changed. |
| For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation). | ||
| - `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation). | ||
| - `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller. | ||
| - `maxRecordsPerPoll`: 1. |
There was a problem hiding this comment.
So does that mean we should update the maxRecordsPerPoll: 1 here?
| records.drain( | ||
| polledRecords, | ||
| expectedSize, | ||
| MAX_BYTES_PER_POLL, |
There was a problem hiding this comment.
It looks like maxRecordsPerPoll isn't doing anything anymore. Is that right? If so let's get rid of it.
There was a problem hiding this comment.
removed, and added maxBytesPerPoll which is being used instead now.
| .filter(x -> !partitions.contains(x.getData().getStreamPartition())) | ||
| .forEachOrdered(x -> { | ||
| if (!newQ.offer(x)) { | ||
| // this should never really happen in practice but adding check here for safety. |
There was a problem hiding this comment.
Checks that should never happen, but are for safety, should be DruidException.defensive
|
|
||
| // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting | ||
| // from this message and back off for a bit to let the buffer drain before retrying. | ||
| if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) { |
There was a problem hiding this comment.
The comment above is no longer accurate -- we aren't grabbing new stream iterators anymore when the buffer is full.
| ); | ||
| this.useListShards = useListShards; | ||
| this.awsCredentialsConfig = awsCredentialsConfig; | ||
| if (tuningConfig.getRecordBufferSizeConfigured() != null) { |
There was a problem hiding this comment.
Please move these two checks to run rather than the constructor, because we don't need to log this stuff every time a task object is constructed. (That happens at various points on the Overlord due to various API calls and internal machinations, and will create a log of log spam.)
There was a problem hiding this comment.
Good catch. Moved.
| (int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL) | ||
| ); | ||
| if (fetchThreads > maxFetchThreads) { | ||
| log.warn("fetchThreads [%d] being lowered to [%d]", fetchThreads, maxFetchThreads); |
There was a problem hiding this comment.
This warning should only get logged if configuredFetchThreads != null. There's no reason to log it if runtimeInfo.getAvailableProcessors() * 2 is lower than maxFetchThreads.
There was a problem hiding this comment.
Good catch, updated.
| @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, | ||
| @JsonProperty("awsExternalId") String awsExternalId, | ||
| @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig, | ||
| @JsonProperty("deaggregate") boolean deaggregate |
There was a problem hiding this comment.
The recordsPerFetch and deaggregate properties should stay here for better compatibility during rolling updates and rollbacks. (We don't want to lose track of them prior to a potential rollback.)
So let's instead mark them deprecated, but keep them.
There was a problem hiding this comment.
added back and marked as deprecated.
| records.drain( | ||
| polledRecords, | ||
| expectedSize, | ||
| maxBytesPerPoll, |
There was a problem hiding this comment.
What happens if a single record is larger than maxBytePerPoll? Would this get stuck and make no progress?
There was a problem hiding this comment.
good question, it always drains at least one record, clarified that in the docs. I added a test for this, see org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueueTest#test_drain_queueWithFirstItemSizeGreaterThanLimit_succeeds
Description
Our Kinesis consumer works by using the GetRecords API in some number of
fetchThreads, each fetching some number of records (recordsPerFetch) and each inserting into a shared buffer that can hold arecordBufferSizenumber of records. The logic is described in our documentation at: https://druid.apache.org/docs/27.0.0/development/extensions-core/kinesis-ingestion/#determine-fetch-settingsThere is a problem with the logic that this pr fixes: the memory limits rely on a hard-coded “estimated record size” that is
10 KBifdeaggregate: falseand1 MBifdeaggregate: true. There have been cases where a supervisor haddeaggregate: trueset even though it wasn’t needed, leading to under-utilization of memory and poor ingestion performance.Users don’t always know if their records are aggregated or not. Also, even if they could figure it out, it’s better to not have to. So we’d like to eliminate the
deaggregateparameter, which means we need to do memory management more adaptively based on the actual record sizes.We take advantage of the fact that GetRecords doesn’t return more than 10MB (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html ):
This pr:
eliminates
recordsPerFetch, always use the max limit of 10000 records (the default limit if not set)eliminate
deaggregate, always have it truecap
fetchThreadsto ensure that if each fetch returns the max (10MB) then we don't exceed our budget (100MBor5% of heap). In practice this meansfetchThreadswill never be more than10. Tasks usually don't have that many processors available to them anyway, so in practice I don't think this will change the number of threads for too many deploymentsadd
recordBufferSizeBytesas a bytes-based limit rather than records-based limit for the shared queue. We do know the byte size of kinesis records by at this point. Default should be100MBor10% of heap, whichever is smaller.add
maxBytesPerPollas a bytes-based limit for how much data we poll from shared buffer at a time. Default is1000000bytes.deprecate
recordBufferSize, userecordBufferSizeBytesinstead. Warning is logged ifrecordBufferSizeis specifieddeprecate
maxRecordsPerPoll, usemaxBytesPerPollinstead. Warning is logged if maxRecordsPerPoll` is specifiedFixed issue that when the record buffer is full, the fetchRecords logic throws away the rest of the GetRecords result after
recordBufferOfferTimeoutand starts a new shard iterator. This seems excessively churny. Instead, wait an unbounded amount of time for queue to stop being full. If the queue remains full, we’ll end up right back waiting for it after the restarted fetch.There was also a call to
newQ::offerwithout check infilterBufferAndResetBackgroundFetch, which seemed like it could cause data loss. Now checking return value here, and failing if false.Release Note
Kinesis ingestion memory tuning config has been greatly simplified, and a more adaptive approach is now taken for the configuration. Here is a summary of the changes made:
eliminates
recordsPerFetch, always use the max limit of 10000 records (the default limit if not set)eliminate
deaggregate, always have it truecap
fetchThreadsto ensure that if each fetch returns the max (10MB) then we don't exceed our budget (100MBor5% of heap). In practice this meansfetchThreadswill never be more than10. Tasks usually don't have that many processors available to them anyway, so in practice I don't think this will change the number of threads for too many deploymentsadd
recordBufferSizeBytesas a bytes-based limit rather than records-based limit for the shared queue. We do know the byte size of kinesis records by at this point. Default should be100MBor10% of heap, whichever is smaller.add
maxBytesPerPollas a bytes-based limit for how much data we poll from shared buffer at a time. Default is1000000bytes.deprecate
recordBufferSize, userecordBufferSizeBytesinstead. Warning is logged ifrecordBufferSizeis specifieddeprecate
maxRecordsPerPoll, usemaxBytesPerPollinstead. Warning is logged if maxRecordsPerPoll` is specifiedThis PR has: