Skip to content

Invalidate cache load status on historical as soon as it is served#14616

Closed
kfaraz wants to merge 17 commits intoapache:masterfrom
kfaraz:fix_load_status
Closed

Invalidate cache load status on historical as soon as it is served#14616
kfaraz wants to merge 17 commits intoapache:masterfrom
kfaraz:fix_load_status

Conversation

@kfaraz
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz commented Jul 19, 2023

Description

Follow up to #11717

This PR attempts to fix the behaviour in the following scenarios:

Case 1: Coordinator gets incorrect response in case of a quick load-drop-load

  1. Historical receives HTTP request to load segment S.
  2. Load succeeds and the cache now has an entry to the effect "LOAD S was SUCCESS".
  3. HTTP request completes with SUCCESS but the entry is still there in the cache.
  4. Historical receives request to drop segment S and request finishes successfully but cache still has entry for "LOAD S was SUCCESS"
  5. Historical receives request to reload segment S. Since we already have an entry in cache, historical incorrectly responds with "LOAD S was SUCCESS" even though the segment has already been dropped from the historical.

Case 2: Coordinator sees a repeatedly failing load as PENDING rather than FAILED

  1. Coordinator requests Historical to load segment.
  2. Request completes while load is still in progress and gets back a PENDING response
  3. Then load fails and FAILED status is cached
  4. Coordinator checks with the Historical if it has loaded the segment.
  5. Since we already have a cached FAILED status, we retry the load and send back a PENDING response instead (see snippet below)
  6. Repeat

// If last load/drop request status is failed, here can try that again
if (status == null || status.get().getState() == Status.STATE.FAILED) {

Case 3:

  1. Coordinator requests historical to load segment.
  2. Historical queues load task on executor.
  3. Coordinator requests historical to drop segment and the Historical
    • unannounces the segment immediately
    • marks the drop request as a success in the cache
    • adds an entry to segmentsToDrop
    • schedules drop task on executor for 30s later
  4. Load task starts and removes this segment from segmentsToDrop and proceeds with downloading the segment.
  5. Load task finishes and announces the segment but is not able to update success status since the latest request is a Drop.
  6. Drop task does not do anything since the segment is not present in segmentsToDrop anymore.
  7. Thus the Drop request is considered a success but the segment files are never deleted.
  8. This might cause the Coordinator to assign this Historical beyond its disk capacity

Similarly a drop that comes before a load might result in partial or complete delete of segment files but the load task might continue to think that it succeeded.

Classes to review

  • SegmentLoadDropHandler
  • SegmentLoadDropHandlerTest

Changes

  • Rename DataSegmentChangeRequestAndStatus to DataSegmentChangeResponse
  • Move DataSegmentChangeResponse out of SegmentLoadDropHandler as it is a POJO returned to the client and need not be tied to a specific loader implementation.
  • Update requestStatuses cache to be a map from DataSegment to the full DataSegmentChangeResponse
  • Remove entry from cache as soon as it is consumed to be sent back to the client
  • Remove entry from cache if a different type of request is received for the same segment
  • Rename and rearrange some methods for readability
  • Clean up tests and add more test cases for the new behaviour

Release note

Fix bug on historical when a load-drop-load of a segment happens in quick succession.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz added the Bug label Jul 19, 2023
@kfaraz kfaraz added this to the 27.0 milestone Jul 19, 2023
public void resolve()
{
synchronized (statusRefs) {
synchronized (requestStatusesLock) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand what the implication is of switching from statusRefs to requestStatusesLock is. However, this file doesn't use @GuardedBy and there isn't a comment on requestStatusesLock explaining how it's supposed to be used. Would you be able to add one or both of those, to make it easier to understand the synchronization logic?

Copy link
Copy Markdown
Contributor Author

@kfaraz kfaraz Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling it out. Yes, it is better if I just add a comment explaining the logic.

I made the switch to make sure that the update invoked by requestStatuses.invalidate() later in this method is safeguarded.

The existing synchronization on statusRefs essentially meant that only one thread can be executing the resolve of a given CustomSettableFuture at any point. That condition still holds but with less concurrency, i.e. in the new code, only one thread can be executing the resolve of any CustomSettableFuture at any point.

So, in other words, while multiple segments might be loaded by the historical in parallel, the futures will only be resolved serially.

This will not have any other impact on the correctness or performance of this code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. It looks like there is no longer anything synchronizing on statusRefs, so the new code is somewhat simpler. Please do add a comment to requestStatusesLock, or use @GuardedBy on the things it is meant to guard, as appropriate, in order to make the logic easier to follow for others.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an @GuardedBy for requestStatuses and some comments.

);
final List<DataSegmentChangeRequestAndStatus> result = new ArrayList<>(statusRefs.size());
statusRefs.forEach((request, statusRef) -> {
// Remove complete statuses from the cache
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this resolve method called exactly? Is it possible that these invalidations are too late, i.e. is it still possible that an even faster load->drop->load would be racey?

Put another way: would it make sense to invalidate the cache on receipt of the drop, rather than here?

Copy link
Copy Markdown
Contributor Author

@kfaraz kfaraz Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the coordinator doesn't give up on a request it has sent to the historical, we should be good.
i.e. once the coordinator has asked a historical to LOAD a segment, it should not ask the historical to DROP the same segment until the LOAD has failed or succeeded.

Otherwise, we might keep the cached value of LOAD success around and serve it to the second load request which comes after the drop.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code could probably do with some cleanup and doing what had been originally suggested in this comment i.e. maintaining the cache (or really just a concurrent map) at the DataSegment level itself rather than DataSegmentChangeRequest.

But I wanted to keep the footprint of this PR low so that we could include it in Druid 27 release. We can take up a more holistic overhaul later. Let me know what you think.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once the coordinator has asked a historical to LOAD a segment, it should not ask the historical to DROP the same segment until the LOAD has failed or succeeded.

Could this happen after Coordinator leader change or restart?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I feel those are the only cases when this is likely to happen.

@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Jul 20, 2023

Also moved out class DataSegmentChangeResponse into a file of its own.

@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Jul 21, 2023

@gianm , there is another case which the current code was behaving weirdly. If a load keeps failing repeatedly on a historical, the coordinator may always get a PENDING response rather than FAILED.

Typical scenario:

  • Coordinator requests Historical to load segment.
  • Request completes while load is still in progress and gets back a PENDING response
  • Then load fails and FAILED status is cached
  • Coordinator checks with the Historical if it has loaded the segment.
  • Since we already have a cached FAILED status, we retry the load and send back a PENDING response instead (see snippet below)
  • Repeat

// If last load/drop request status is failed, here can try that again
if (status == null || status.get().getState() == Status.STATE.FAILED) {

@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Jul 21, 2023

Trying to handle all these case, I have made the following changes:

  • Update the cache to be a map from DataSegment to the full DataSegmentChangeResponse
  • Invalidate the cache if a different request type is received
  • Rearrange and rename some methods to simplify the code
  • Add javadocs where applicable
  • Clean up SegmentLoadDropHandlerTest and add some more test cases

@gianm , let me know what you think.

if (segmentsToDrop.remove(segment)) {
segmentManager.dropSegment(segment);

File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString());

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression

This path depends on a [user-provided value](1).
@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Jul 24, 2023

There is one more case that requires handling, a concurrent load and drop of the same segment. I am currently working on this, will update the PR soon.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Jul 24, 2023

concurrent load and drop of the same segment

When would this occur?

@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Jul 24, 2023

When would this occur?

Only if the client (Coordinator) sends a LOAD request and then sends a DROP request for the same segment while the load is still in progress. There is already a list of segmentsToDrop but that might be ineffective in some cases.

Proposed fix

Better handling of the segmentsToDrop list.

  • In processRequest
    • add an entry to segmentsToDrop if request is drop
    • remove entry from segmentsToDrop if request is load
  • In loadAndAnnounceSegment
    • proceed with load, announce and/or updating the status only if there is no entry in segmentsToDrop
  • In dropSegment
    • proceed with drop only if there is an entry in segmentsToDrop
    • clear entry from segmentsToDrop only after finishing the drop

Typical scenario

  • Coordinator asks historical to load segment
    • Historical queues LOAD task on executor
  • Coordinator asks historical to drop segment. The Historical then
    • unannounces the segment immediately
    • adds an entry to segmentsToDrop
    • schedules DROP task on executor for 30s later
  • LOAD task starts and removes this segment from segmentsToDrop and proceeds with downloading the segment
  • LOAD task finishes and announces the segment but is not able to update success status since the latest request is a DROP.
  • DROP task does not do anything since the segment is not present in segmentsToDrop anymore. Thus the result holder remains stuck in pending state.
  • DROP request would be marked as successful without having deleted the segment files.
  • This may lead to the Coordinator assigning this Historical beyond its disk capacity.
    - All subsequent DROP requests will be answered with pending (at least until the cache entry expires) without ever actually trying to drop the segment.
    - Only a subsequent LOAD request will clear this state

Similarly a DROP that comes before a LOAD might result in partial or complete delete of segment files but the LOAD task might continue to think that it succeeded.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Jul 24, 2023

@kfaraz Thanks for outlining the scenario. IMO, it makes sense to include this fix, since it seems straightforward.

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the changes LGTM. I had a couple of questions.

loaded = segmentManager.loadSegment(
segment,
lazy,
() -> unannounceAndDropSegment(segment, DataSegmentChangeCallback.NOOP),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it all these changes from removeSegment to unannounceAndDropSegment are so we do an immediate drop (rather than delayed drop) in the case of failed loads? If so— that seems right to me. No reason to delay the drop if we never loaded and announced it in the first place.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is the intention here. But just to clarify, this was already the case and has not been introduced by this PR. I have only renamed the methods to remove ambiguity.

* from the cache.</li>
* </ol>
* <p>
* Maximum size of this cache must be significantly greater than the number of
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if it's not significantly greater?

I guess this could happen if someone sets druid.coordinator.loadqueuepeon.http.batchSize on the Coordinator higher than druid.segmentCache.statusQueueMaxSize on the Historical.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Coordinator sends a batch larger than the cache size, the Historical would not be able to have an entry for some of the requests in the cache. These requests would still be processed but their finished statuses would never be updated in the cache and thus never sent back to the Coordinator.

The Coordinator would end up retrying these requests and the Historical might duplicate some work but there would never be any correctness issue, since we are not sending any response to the Coordinator for the un-cached requests.

Given that the default cache size is 100 and default request batch size is 1, we should be okay. We can update the docs to mention that the cache size must be at least 10x of the batch size.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, druid.segmentCache.statusQueueMaxSize is not documented yet.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the 10x factor come from?

Copy link
Copy Markdown
Contributor Author

@kfaraz kfaraz Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the 10x factor come from?

Just a safe-side measure in case Coordinator sends multiple batches of different requests in quick succession, such that the total number of unique requests exceeds the cache size. Not really based on any concrete calculations. 😅

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more thoughts on this:

  • The Cache can really just be a Map. The only advantage the Cache might offer is auto-expiry of requests that the Coordinator gave up on, but I don't think that is currently being used in the code anyway.
  • Maybe we shouldn't even have a config for batchSize. The Coordinator would start by sending a single request to the historicals and in each HTTP response, each Historical could indicate how many requests it would like to take in the next batch. This could be based on the number of loading threads, number of pending requests.
  • Given that the number of pending requests is always going to be small, we should just get rid of the cacheSize config and hard-code it to something like 1000. And if in any case, the Coordinator sends more requests than a Historical can handle, it immediately fails the request with an appropriate error message.

What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would vote for removing the cache structure - as discussed internally as well, if we are relying on HTTP status codes for correctness I don't think relying on a cache structure on server side is fully reliable.
Another easy solution when a defined amount of queued request is reached could be to return some retrying HTTP response to the call so that if needed, the load queue peon can retry that request after sometime.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Cache can really just be a Map. The only advantage the Cache might offer is auto-expiry of requests that the Coordinator gave up on, but I don't think that is currently being used in the code anyway.

The Coordinator could crash and never follow up on a request, so it's still useful to expire stuff.

Maybe we shouldn't even have a config for batchSize. The Coordinator would start by sending a single request to the historicals and in each HTTP response, each Historical could indicate how many requests it would like to take in the next batch. This could be based on the number of loading threads, number of pending requests.

Seems like a good idea! In addition, in case the Coordinator sends more commands than the Historical asks for, the Historical could reject the extra commands. (Although I am not sure if the protocol allows this.)

Given that the number of pending requests is always going to be small, we should just get rid of the cacheSize config and hard-code it to something like 1000.

Sounds good to me. Fewer configs are good.

Btw, I am wondering now, what are the scenarios where the Historical might receive multiple copies of the same load or drop command? Is this something that happens in typical operation, or in some kind of abnormal case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I am wondering now, what are the scenarios where the Historical might receive multiple copies of the same load or drop command?

Do you mean the same action on same segment or different actions on same segment? Both of these can actually happen under normal conditions:

Same action on same segment

Even under normal operation, the Coordinator keeps sending the same request to a Historical until the request finishes.
Essentially, the Coordinator sends a batch of requests to the Historical and waits for a response. Historical sends back a response as soon as any one of the requests in this batch (or even a previously submitted batch which is still being tracked) finishes. So chances are that the Coordinator gets back a response with one request having status SUCCESS or FAILED and the others PENDING, because the Historical was still working on them when it completed the HTTP request. The Coordinator then re-sends the requests for which it had received a PENDING status. By then, the Historical might have already finished them and cached their results.

Different actions on same segment but after a delay

The case for a different action (i.e. load after drop) without ever having read the status of the previous action can happen during coordinator restarts or leader re-elections. Although, in these cases, it is not likely that the drop and load would happen concurrently.

Different actions on same segment happening concurrently

This is not expected to happen under normal conditions, and can only be caused by the Coordinator being in some weird state or a bug in the code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even under normal operation, the Coordinator keeps sending the same request to a Historical until the request finishes.

Ah, thanks for explaining. I didn't realize it was that common.

@rohangarg
Copy link
Copy Markdown
Member

A couple of doubts :

  1. DROP task does not do anything since the segment is not present in segmentsToDrop anymore. Thus the result holder remains stuck in pending state.

Are we sure that the DROP request actually gets stuck in pending state? It appears from the current code (https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java#L463) that the status is marked success after either the delete request is scheduled async or run inline.

  1. Since we changed the return type object of the HTTP call, will it be backwards compatible? For instance if the historicals are updated before coordinator (as we suggest), could it happen that the historicals return new object types which an old coordinator wouldn't be able to understand?

@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Jul 25, 2023

Are we sure that the DROP request actually gets stuck in pending state? It appears from the current code (https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java#L463) that the status is marked success after either the delete request is scheduled async or run inline.

Yes, @rohangarg , thanks for pointing this out. I have updated my comment. The DROP request is marked a success immediately after unannouncing the segment. Repeated back and forth in the concurrency cases of this code got me turned around a bit 😅 .

Even though this particular case is probably not that big of an issue (marking a segment as drop success even though the files are still there), it might lead to the coordinator assigning historicals beyond their disk capacity.

I have tried to handle such cases now though and they should work as expected. Please let me know if you have any further thoughts.

Since we changed the return type object of the HTTP call, will it be backwards compatible? For instance if the historicals are updated before coordinator (as we suggest), could it happen that the historicals return new object types which an old coordinator wouldn't be able to understand?

The response payload has not changed, only the Java class name has changed. So the changes are backward compatible. Please let me know if you notice a discrepancy in this.

@kfaraz kfaraz added the WIP label Jul 27, 2023
@gianm gianm removed this from the 27.0 milestone Jul 27, 2023
@gianm
Copy link
Copy Markdown
Contributor

gianm commented Jul 27, 2023

Removed from 27.0 in favor of #14670.

@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Jul 28, 2023

Closing this as this implementation is still buggy and the design requires some further thought.
I will create a fresh PR soon.

@kfaraz kfaraz closed this Jul 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants