Skip to content

Remediate ingestion failures when number of segments in time period is larger than 32767#15090

Closed
dulu98Kurz wants to merge 4 commits intoapache:masterfrom
dulu98Kurz:dun_bugfix_statemap
Closed

Remediate ingestion failures when number of segments in time period is larger than 32767#15090
dulu98Kurz wants to merge 4 commits intoapache:masterfrom
dulu98Kurz:dun_bugfix_statemap

Conversation

@dulu98Kurz
Copy link
Copy Markdown

@dulu98Kurz dulu98Kurz commented Oct 4, 2023

Fixes #15091.

Description

This PR is attempting to remediate the exception java.lang.IllegalArgumentException: fromKey > toKey when number of segments is larger than Java Short.MAX_VALUE 32767, without fully context on why we set a limit on number of segments in time period to be within range of Java Short, this is what I believe that could make ingestion keep going
I can send a different PR if it is appropriate to change the number of segments to be in range of Int intead of Short, which requires larger scope of changes.

Fixed the bug

#15091.

Renamed the class

None

Added a forbidden-apis entry ...

None

Release note

Prevent ingestion failures when number of segments in time period exceeds 32767


Key changed/added classes in this PR
  • OvershadowableManager

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@dulu98Kurz dulu98Kurz changed the title Dun bugfix statemap Remediate ingestion failures when number of segments in time period is larger than 32767 Oct 4, 2023
@dulu98Kurz
Copy link
Copy Markdown
Author

Let me know if it is appropriate to refactor partitionId to Int instead of short, which I believe solve this problem more completely.

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, @dulu98Kurz ! I have left some comments.

{
final RootPartitionRange lowFench = new RootPartitionRange(partitionId, partitionId);
// remediate submap `fromKey > toKey` issue when partitionId overflows
final short partitionIdLowFence = partitionId < 0 ? Short.MAX_VALUE : partitionId;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just checking the argument in the constructor of RootPartitionRange is enough. Since RootPartitionRange does not accept startPartitionId or endPartitionId less than 0, we will not have a case where partitionId passed to this method is less than 0.
And even if it is, we should throw an exception rather than silently convert it to max value.

Copy link
Copy Markdown
Author

@dulu98Kurz dulu98Kurz Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking on this! @kfaraz , the constructor of RootPartitionRange that takes short type of startPartitionId was defined as private and we are forced to use :

static RootPartitionRange of(int startPartitionId, int endPartitionId)
    {
      return new RootPartitionRange((short) startPartitionId, (short) endPartitionId);
    }

Since startPartitionId is in Integer range, when startPartitionId > Short.MAV_VALUE ( 32767 ), the casting from int -> short start producing negative number, for example
image
And the loop repeats when startPartitionId continued going because casting from Int to Short losses precision:
image
We happen to ran into this short overflow scenario described in #15091 and our ingestion task for new data was completely broken because of this, throwing exception would still make the ingestion fail.
Here I'm making startPartitionId to be Short.MAX_VALUE so that it won't produce java.lang.IllegalArgumentException: fromKey > toKey and broke ingestion when we do stateMap.subMap(lowFence, false, highFence, false) , this is just an remediation.

I believe a better way to handle this is to allow startPartitionId and endPartitionId to be integer and avoid the problematic precision-loss casting, I can send another PR to this solution if you can confirm why we had this short limit originally and it is appropriate to do so.

Best,
Dun

Copy link
Copy Markdown
Author

@dulu98Kurz dulu98Kurz Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kfaraz thanks again for spending time on this!
Bumping again just in case you missed my last message , I‘m more than glad to if there’s anything need further clarification , I can also connect on a zoom call if it‘s convenient for you!

Best Regards,
Dun

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dulu98Kurz - I am not entirely sure but short was likely chosen to save on memory that storing these ids will take. 32K partitions in one single interval is too high. Can you describe a bit more as to how your cluster ends up in this situation and why is that a genuine scenario? In my experience, almost every time, an interval touching this high number means that compaction is not configured or ingestion is misconfigured.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @abhishekagarwal87 , thanks for checking on this!
You are right our investigation suggesting both late-messages from upstream and compactions falling behind, specifically we found there were random late-messages mixed in the kafka topics, it keep adding tiny segments to finalized trunk and eventually goes beyond short range and broke live ingestion tasks of new data, setting rejection period was not ideal because it means we will lose data, and because compaction falling behind we can`t afford to wait for it to catch up , I end up hard deleting the problematic time-trunk and then I realized solely relying on compaction seems inadequate.

Admittedly it is not an ideal use-case for Druid to handle random late messages, but it was a really difficult choice when user had to chose between letting ingestion broke vs deleting problematic time trunk.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead of capping at short max, we can possibly cover the gap by:

  • Allowing partitionId goes into Int range
  • Logging error messages to strongly remind user we need to compact/reduce num of segments.

For users who do not have late messages or compaction issues, this change has no impact to them because they won't store more than short max segments anyway, so we don`t break the initial intension of saving on mem.

For users who actually can produce segments beyond short max, this will buy them more time to compact/reduc number of segments, which may eventually avoid the difficult situation above.

Copy link
Copy Markdown
Author

@dulu98Kurz dulu98Kurz Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From code quality perspective, Short.toUnsignedInt is a precision-loss conversion and we used it in 2 files for 18 times, we can simplify the logic and improve readability if we change to int
image

Lastly, when partitionId is out of range, the logic we use to handle it right now is simply wrong:

final RootPartitionRange lowFench = new RootPartitionRange(partitionId, partitionId);
final RootPartitionRange highFence = new RootPartitionRange(Short.MAX_VALUE, Short.MAX_VALUE);
return stateMap.subMap(lowFench, false, highFence, false).entrySet().iterator();

For example when:
image
Then stateMap.subMap(lowFench, false, highFence, false) will return all entries instead of empty ...

If we are ok with remediation in this PR, we can proceed with merging, if we are OK with refactoring please allow me to send another PR to fix it more completely.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the PR that refactor partitionId from short to int so that we can compare the scope of changes
#15116

@dulu98Kurz
Copy link
Copy Markdown
Author

Hi @kfaraz @abhishekagarwal87 sorry for being verbose on thread, I hope I had our issue well described , please let me know your thoughts, I`m open to any solutions that can keep our ingestion alive without deleting historical data.

@cryptoe
Copy link
Copy Markdown
Contributor

cryptoe commented Oct 19, 2023

@dulu98Kurz Thank you for the patience. Since most of the committers are busy with druid 28 things there might be some delay.

I think we have a ugly fail safe here. What I would prefer is to change the exception message to something nicer so that the user does not need to read through the druid's code base.

Like @abhishekagarwal87 32K partitions per interval just seem to massive and is generally an issue where in either compaction is lagging behind or late data is coming.
If we change this variable to int, the ticking time bomb has a larger impact and can lead to a full cluster outage where the number of segments in the cluster balloons up in millions.

I think if we do want to change this to INT, we should add a guard rail to the number of partitions per interval from the implicit 32K to something larger maybe 50K ?

@dulu98Kurz
Copy link
Copy Markdown
Author

@cryptoe thanks for checking on this!
Agreed, letting the segments number growing wildly to the Integer MAX would definitely accumulate a bigger problem.

I`ll update the PR to include a CAP of 50K or 65536(if we want to go by binary convention), also making the exception message more meaningful, will send update soon, thanks again for attention! @cryptoe

@dulu98Kurz
Copy link
Copy Markdown
Author

Hi @cryptoe @abhishekagarwal87 @kfaraz , please find updates in PR (#15116) , it includes changes as per discussion:

  1. More informative exception language
  2. Refactored partitionId in RootPartitionRange from short to int, but with a max value of 65536 to prevent memory pressure out of control.
  3. Removed unnecessary problematic short to int conversion.

Please let me know for any concerns, I`ll close this PR when #15116 is merged.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 7, 2024

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions Bot added the stale label Mar 7, 2024
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 5, 2024

This pull request/issue has been closed due to lack of activity. If you think that
is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this Apr 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ingestion failure when number of segments in time trunk exceeded Short.MAX_VALUE

4 participants