Skip to content

KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks#8661

Merged
guozhangwang merged 3 commits intoapache:trunkfrom
cadonna:AK9603-fix_standbys_segmented_stores
May 20, 2020
Merged

KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks#8661
guozhangwang merged 3 commits intoapache:trunkfrom
cadonna:AK9603-fix_standbys_segmented_stores

Conversation

@cadonna
Copy link
Copy Markdown
Member

@cadonna cadonna commented May 12, 2020

Segmented state stores turn on bulk loading of the underlying RocksDB
when restoring. This is correct for segmented state stores that
are in restore mode on active tasks and the onRestoreStart() and
onRestoreEnd() in RocksDBSegmentsBatchingRestoreCallback take care
of toggling bulk loading mode on and off. However, restoreAll()
in RocksDBSegmentsBatchingRestoreCallback might also turn on bulk loading
mode. When this happens on a stand-by task bulk loading mode is never
turned off. That leads to steadily increasing open file descriptors
in RocksDB because in bulk loading mode RocksDB creates continuously new
files but never compacts them (which is the intended behaviour).

This PR checks whether the task is an active task before turning on bulk
loading mode in restoreAll() for segmented state stores.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is either redundant or wrong. If on an active task it is redundant because bulk loading is turned on and off by onRestoreStart() and onRestoreEnd() in RocksDBSegmentsBatchingRestoreCallback. If on a stand-by task it is wrong because bulk loading is turned on but never off leading to an ever increasing number of open file descriptors.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original motivation is that what if the segment does not exist (e.g. its time range is old) yet when we turn on bulk loading for all. Would that be a problem?

Copy link
Copy Markdown
Member Author

@cadonna cadonna May 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, the removed code tries to optimize the loading of the segment if there is no data on the client (i.e. state directory for this segment does not exist or is empty). Looking at the code, this optimization is only done for segmented state stores. We never turn on bulk loading on stand-by tasks for other store types. In other words, for stand-by tasks we do not use the restore callbacks onRestoreStart() and onRestoreEnd(), where bulk loading is turned on and off. If we did, the removed code would be redundant. Furthermore, segment.toggleDbForBulkLoading(true); is called here, but I could not find the corresponding segment.toggleDbForBulkLoading(false); which makes the code wrong.

Regarding the effect of this "optimization" on active tasks, turning on bulk loading will close and open the segment. However, if the active task is in restore state (which it should be if there is no data on the client), the segment should already have bulk loading turned on. Thus, closing and opening the segment is useless.

So my conclusion is, that the removed code is either redundant or wrong. Let me know, if I missed anything.

I run the code provided by https://github.com/lpandzic/kafka-9603 with and without this fix and the open file descriptors are not steadily increasing anymore with the fix.

Maybe it would be possible to turn on bulk loading also for stand-by tasks, if we find good conditions that can be used to turn bulk loading on and off, but this is out of scope for this fix, IMO.

Copy link
Copy Markdown
Member

@mjsax mjsax May 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, we only want to use bulk loading for active tasks. However if the segmented store is empty, onRestoreStart() would not open anything in bulk loading mode, as there is nothing to be opened. During restore, we might create new segments and we want those segments to be opened in bulk load mode; this is what this code does. Later, onRestoreEnd() would switch off bulk loading for all previously existing and all newly created segments.

The bug seems to be, that we also open new segments in bulk load mode for standby tasks. Hence, instead of removing the code, the right fix seem to be to check if the task is active/restoring or a standby and keep the feature for active/restoring but disable it for standby tasks?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Nothing else in the bulk loading mode is particularly useful considering how we load standbys at the moment so we should just disable/skip this unless active.

I only ask that we try and do so in a way that doesn't become a nightmare for the active <--> standby task conversion 🙂

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, now I see what I missed. I should have known that it could not have been that easy. I need to think about a better fix. I will keep active <---> standby conversion in mind.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually even for standby tasks, it should also be beneficial to use bulk-loading right (e.g. if the standby is far behind the active and has a large amount of records)?

I'm thinking that in the long run, maybe we could optionally allow restore callbacks to be triggered for standby as well: we can use some simple heuristics such that if the changelog log-end offset - standby task's store offset > certain threshold, we trigger onRestoreStart(), and then we can goes back from the "sprinting" mode to normal mode after we've been close enough to the log-end offset.

At the mean time, we can maybe hack a bit so that when segment.toggleDbForBulkLoading we set a flag and in the other we reset the flag, then during restoreAll we check the flag to decide whether enable bulk loading for newly created segment.

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman May 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the current bulk loading configs:

dbOptions.setMaxBackgroundFlushes(4);
columnFamilyOptions.setDisableAutoCompactions(true);
columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);

Setting aside the problems these are causing users even for active tasks, they basically mean "shove everything into the lowest file level and never attempt to limit these writes". This is useful if you're just trying to shove a lot of data into a store as fast as possible but not necessary need to use it immediately after, which is (debatably) the right thing for restoring tasks but definitely not appropriate for standbys*. We will attempt to restore a batch of records once per main thread loop, which means doing a lot of other stuff in between. There's no reason not to just use normal mode writing for standbys AFAICT -- also bulk loading will make IQ on standbys pretty annoying at best.

*In the larger scope, perhaps when we move standbys to a separate thread, I'd say we actually should be turning on bulk loading. BUT we need to issue a manual compaction every so often, and ideally not flush them during every commit (related to KAFKA-9450

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that makes sense to me. I'm thinking about the world where standbys (and also restoring tasks) are executed on different threads. The concern about IQ are valid indeed that with a large set of un-compacted L0 files.

In the even larger scope, where we would have checkpoints I'd believe that bulk-loading would not be very necessary since we would not have a huge number of records to catch up any more :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm now thinking that when we moved the ChangelogReader out of the stream thread, should we just consider removing the bulk loading logic for everyone.

@cadonna
Copy link
Copy Markdown
Member Author

cadonna commented May 12, 2020

Call for review @guozhangwang @ableegoldman @lpandzic

@mjsax mjsax added the streams label May 12, 2020
@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

cadonna added 3 commits May 19, 2020 10:08
…by tasks

Segmented state stores turn on bulk loading of the underlying RocksDB
when restoring. This is correct for segmented state stores that
are in restore mode on active tasks and the onRestoreStart() and
onRestoreEnd() in RocksDBSegmentsBatchingRestoreCallback take care
of toggling bulk loading mode on and off. However, restoreAll()
in RocksDBSegmentsBatchingRestoreCallback might also turn on bulk loading
mode. When this happens on a stand-by task bulk loading mode is never
turned off. That leads to steadily increasing open file decriptors
in RocksDB because in bulk loading mode RocksDB creates continuously new
files but never compacts them (which is the intended behaviour).

This PR removes the code that turns on bulk loading mode in restoreAll()
for segemented state stores because it is redundant. If the state store
is in restoring mode on an active task bulk loading is turned on and off
anyways. On stand-by tasks bulk loading mode should never be turned on.
@cadonna cadonna force-pushed the AK9603-fix_standbys_segmented_stores branch from b33da15 to 3170787 Compare May 19, 2020 09:27
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the fix!

@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

@guozhangwang
Copy link
Copy Markdown
Contributor

test this

@guozhangwang
Copy link
Copy Markdown
Contributor

test this

@guozhangwang guozhangwang merged commit 9a47d7e into apache:trunk May 20, 2020
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request May 21, 2020
* 'trunk' of github.com:apache/kafka:
  MINOR: Increase gradle daemon’s heap size to 2g (apache#8700)
  KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks (apache#8661)
  KAFKA-9859 / kafka-streams-application-reset tool doesn't take into account topics generated by KTable foreign key join operation (apache#8671)
  MINOR: Fix redundant typos in comments and javadocs (apache#8693)
  KAFKA-10010: Should make state store registration idempotent (apache#8681)
  KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll (apache#8682)
  KAFKA-9992: Eliminate JavaConverters in EmbeddedKafkaCluster (apache#8673)
  KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173 (apache#8689)
  MINOR: Update stream documentation (apache#8622)
  MINOR: Small fixes in the documentation (apache#8623)
@cadonna cadonna deleted the AK9603-fix_standbys_segmented_stores branch May 22, 2020 12:08
vvcephei pushed a commit that referenced this pull request Jul 8, 2020
KAFKA-9603 reports that the number of open files keeps increasing
in RocksDB. The reason is that bulk loading is turned on but
never turned off in segmented state stores for standby tasks.

This bug was fixed in 2.6 through PR #8661 by using code
that is not present in 2.5. So cherry-picking was not possible.

Reviewers: John Roesler <vvcephei@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants