Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import it.unimi.dsi.fastutil.shorts.ShortSortedSets;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.Overshadowable;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -85,6 +86,7 @@ enum State
OVERSHADOWED
}

private static final Logger log = new Logger(OvershadowableManager.class);
private final Map<Integer, PartitionChunk<T>> knownPartitionChunks; // served segments

// (start partitionId, end partitionId) -> minorVersion -> atomicUpdateGroup
Expand Down Expand Up @@ -418,9 +420,11 @@ private Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGro
TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap
)
{
final RootPartitionRange lowFench = new RootPartitionRange(partitionId, partitionId);
// remediate submap `fromKey > toKey` issue when partitionId overflows
final short partitionIdLowFence = partitionId < 0 ? Short.MAX_VALUE : partitionId;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just checking the argument in the constructor of RootPartitionRange is enough. Since RootPartitionRange does not accept startPartitionId or endPartitionId less than 0, we will not have a case where partitionId passed to this method is less than 0.
And even if it is, we should throw an exception rather than silently convert it to max value.

Copy link
Copy Markdown
Author

@dulu98Kurz dulu98Kurz Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking on this! @kfaraz , the constructor of RootPartitionRange that takes short type of startPartitionId was defined as private and we are forced to use :

static RootPartitionRange of(int startPartitionId, int endPartitionId)
    {
      return new RootPartitionRange((short) startPartitionId, (short) endPartitionId);
    }

Since startPartitionId is in Integer range, when startPartitionId > Short.MAV_VALUE ( 32767 ), the casting from int -> short start producing negative number, for example
image
And the loop repeats when startPartitionId continued going because casting from Int to Short losses precision:
image
We happen to ran into this short overflow scenario described in #15091 and our ingestion task for new data was completely broken because of this, throwing exception would still make the ingestion fail.
Here I'm making startPartitionId to be Short.MAX_VALUE so that it won't produce java.lang.IllegalArgumentException: fromKey > toKey and broke ingestion when we do stateMap.subMap(lowFence, false, highFence, false) , this is just an remediation.

I believe a better way to handle this is to allow startPartitionId and endPartitionId to be integer and avoid the problematic precision-loss casting, I can send another PR to this solution if you can confirm why we had this short limit originally and it is appropriate to do so.

Best,
Dun

Copy link
Copy Markdown
Author

@dulu98Kurz dulu98Kurz Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kfaraz thanks again for spending time on this!
Bumping again just in case you missed my last message , I‘m more than glad to if there’s anything need further clarification , I can also connect on a zoom call if it‘s convenient for you!

Best Regards,
Dun

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dulu98Kurz - I am not entirely sure but short was likely chosen to save on memory that storing these ids will take. 32K partitions in one single interval is too high. Can you describe a bit more as to how your cluster ends up in this situation and why is that a genuine scenario? In my experience, almost every time, an interval touching this high number means that compaction is not configured or ingestion is misconfigured.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @abhishekagarwal87 , thanks for checking on this!
You are right our investigation suggesting both late-messages from upstream and compactions falling behind, specifically we found there were random late-messages mixed in the kafka topics, it keep adding tiny segments to finalized trunk and eventually goes beyond short range and broke live ingestion tasks of new data, setting rejection period was not ideal because it means we will lose data, and because compaction falling behind we can`t afford to wait for it to catch up , I end up hard deleting the problematic time-trunk and then I realized solely relying on compaction seems inadequate.

Admittedly it is not an ideal use-case for Druid to handle random late messages, but it was a really difficult choice when user had to chose between letting ingestion broke vs deleting problematic time trunk.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead of capping at short max, we can possibly cover the gap by:

  • Allowing partitionId goes into Int range
  • Logging error messages to strongly remind user we need to compact/reduce num of segments.

For users who do not have late messages or compaction issues, this change has no impact to them because they won't store more than short max segments anyway, so we don`t break the initial intension of saving on mem.

For users who actually can produce segments beyond short max, this will buy them more time to compact/reduc number of segments, which may eventually avoid the difficult situation above.

Copy link
Copy Markdown
Author

@dulu98Kurz dulu98Kurz Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From code quality perspective, Short.toUnsignedInt is a precision-loss conversion and we used it in 2 files for 18 times, we can simplify the logic and improve readability if we change to int
image

Lastly, when partitionId is out of range, the logic we use to handle it right now is simply wrong:

final RootPartitionRange lowFench = new RootPartitionRange(partitionId, partitionId);
final RootPartitionRange highFence = new RootPartitionRange(Short.MAX_VALUE, Short.MAX_VALUE);
return stateMap.subMap(lowFench, false, highFence, false).entrySet().iterator();

For example when:
image
Then stateMap.subMap(lowFench, false, highFence, false) will return all entries instead of empty ...

If we are ok with remediation in this PR, we can proceed with merging, if we are OK with refactoring please allow me to send another PR to fix it more completely.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the PR that refactor partitionId from short to int so that we can compare the scope of changes
#15116

final RootPartitionRange lowFence = new RootPartitionRange(partitionIdLowFence, partitionIdLowFence);
final RootPartitionRange highFence = new RootPartitionRange(Short.MAX_VALUE, Short.MAX_VALUE);
return stateMap.subMap(lowFench, false, highFence, false).entrySet().iterator();
return stateMap.subMap(lowFence, false, highFence, false).entrySet().iterator();
}

/**
Expand Down Expand Up @@ -1054,6 +1058,13 @@ private static <T extends Overshadowable<T>> RootPartitionRange of(AtomicUpdateG

private RootPartitionRange(short startPartitionId, short endPartitionId)
{
if (startPartitionId < 0 || endPartitionId < 0) {
log.error(
"PartitionId [%s],[%s] possibly out of range of Short.MAX_VALUE, please compact your segements or reduce number of segments in time peroid ",
startPartitionId,
endPartitionId
);
}
this.startPartitionId = startPartitionId;
this.endPartitionId = endPartitionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,35 @@ public void testFindOvershadowedBy()
);
}

@Test
public void testHandleOutOfRangeFindOvershadowedBy()
{
final List<PartitionChunk<OvershadowableInteger>> expectedOvershadowedChunks = new ArrayList<>();
PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(32001, 32003, 1, 1);
manager.addChunk(chunk);
expectedOvershadowedChunks.add(chunk);
manager.addChunk(newNonRootChunk(32001, 32005, 2, 1));
manager.addChunk(newNonRootChunk(32767, 32768, 3, 1));
manager.addChunk(newNonRootChunk(32768, 32769, 3, 1));

List<AtomicUpdateGroup<OvershadowableInteger>> overshadowedGroups = manager.findOvershadowedBy(
RootPartitionRange.of(32000, 32767),
(short) 4,
State.OVERSHADOWED
);
Assert.assertEquals(
expectedOvershadowedChunks.stream().map(AtomicUpdateGroup::new).collect(Collectors.toList()),
overshadowedGroups
);

overshadowedGroups = manager.findOvershadowedBy(
RootPartitionRange.of(32769, 32769),
(short) 10,
State.VISIBLE
);
Assert.assertTrue(overshadowedGroups.isEmpty());
}

@Test
public void testFindOvershadows()
{
Expand Down