Fix race between canHandle() and addSegment() in StorageLocation#8114
Fix race between canHandle() and addSegment() in StorageLocation#8114leventov merged 8 commits intoapache:masterfrom
Conversation
|
@jihoonson please don't remove "this PR has been reviewed with concurrency checklist" item from the list since this PR does have relation to concurrency. |
|
@leventov added back. |
|
|
||
| public boolean canHandle(DataSegment segment) | ||
| /** | ||
| * This method is available for only unit tests. Production code must use {@link #reserve} instead. |
There was a problem hiding this comment.
This javadoc is semi confusing because reserve calls canHandle?
There was a problem hiding this comment.
Tried to make it clear.
| * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers | ||
| * and phase 2 tasks read those files via HTTP. | ||
| * | ||
| * <p> |
There was a problem hiding this comment.
nit: were these <p> added accidentally?
There was a problem hiding this comment.
Hmm, these are actually inserted by Intellij's auto indentation based on the code style. It seems legit?
There was a problem hiding this comment.
Hmm, I think there is maybe some reason why this isn't in the code style - i remember being told to remove these in the past and to manually adjust intellij to not insert them, I'll see if I can find the reason.
There was a problem hiding this comment.
I think the reason we usually avoid them is that they look silly, and also aren't adding anything useful since Druid javadocs are mostly read in source form anyway. It's not a big deal either way. Maybe one day someone will add a style rule for it and get rid of them all.
| * Remove a segment dir from this location. The segment size is subtracted from currSize. | ||
| */ | ||
| public synchronized void addSegmentDir(File segmentDir, DataSegment segment) | ||
| public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) |
There was a problem hiding this comment.
After thinking about it a bit, it doesn't seem particularly beneficial for StorageLocation to have to be aware of DataSegment and SegmentId, since DataSegment is only used as a shorthand to get the size and SegmentId, and the SegmentId is only used for logging. It seems like reworking this might make the implementation of StorageLocation a bit more simple and allow IntermediaryDataManager and SegmentLoaderLocalCacheManager to use the same reserve and remove methods. I would consider this optional however, up to you if you want to investigate.
There was a problem hiding this comment.
I already looked into that way. The thing is, the way to use StorageLocation is different in IntermediaryDataManager and SegmentLoaderLocalCacheManager. In IntermediaryDataManager, the compressed segment files are stored and registered in StorageLocation. In SegmentLoaderLocalCacheManager, the uncompressed segment directories are registered in StorageLocation. This led to use different size computations in removeFile(File) and removeSegmentDir(File, DataSegment). I wanted to make sure that the caller must be aware of what it's registering and call the right method.
I guess we could have an abstract StorageLocation class and its two implementations for different use cases. But I'm not sure it's super beneficial at this point because StorageLocation is still pretty simple.
There was a problem hiding this comment.
Thanks for the explanation, sgtm 👍
| return; | ||
| } | ||
| } | ||
| catch (Exception e) { |
There was a problem hiding this comment.
Can catch narrower exception type?
There was a problem hiding this comment.
This is to print something in the log file so that operators could notice that something happened.
There was a problem hiding this comment.
This is generally considered an antipattern to catch everything just in case unless the exception is rethrown in the catch block. Could catch only IOExceptions?
There was a problem hiding this comment.
Sorry, I was confused with what part you were saying. It sounds good. Fixed.
| } | ||
| } | ||
| catch (Exception e) { | ||
| // Print only log here to try other locations as well. |
There was a problem hiding this comment.
"Print only log" - a strange phrase. Perhaps it should be just "Only log"
|
|
||
| private final File path; | ||
| private final long maxSize; | ||
| private final long maxSize; // in bytes |
There was a problem hiding this comment.
Can call this field maxSizeBytes
| private final long maxSize; // in bytes | ||
| private final long freeSpaceToKeep; | ||
|
|
||
| // Set of files stored under the given path. All accesses must be synchronized with currSize. |
There was a problem hiding this comment.
Please turn this comment into a Javadoc comment.
| private final Set<File> files = new HashSet<>(); | ||
|
|
||
| private volatile long currSize = 0; | ||
| // Current total size of files in bytes. All accesses must be synchronized with files. |
|
|
||
| private volatile long currSize = 0; | ||
| // Current total size of files in bytes. All accesses must be synchronized with files. | ||
| private long currSize = 0; |
There was a problem hiding this comment.
"All accesses must be synchronized with files." can be replaced with @GuardedBy("this"). Other fields in this class should be annotated, too. See https://github.com/code-review-checklists/java-concurrency#guarded-by
There was a problem hiding this comment.
Can call this field currSizeBytes
There was a problem hiding this comment.
Good convention. Added.
| boolean canHandle(SegmentId segmentId, long segmentSize) | ||
| { | ||
| if (available() < segment.getSize()) { | ||
| if (available() < segmentSize) { |
There was a problem hiding this comment.
I suggest to call this method "availableSizeBytes"
| * This method is available for only unit tests. Production code must use {@link #reserve} instead. | ||
| */ | ||
| @VisibleForTesting | ||
| boolean canHandle(SegmentId segmentId, long segmentSize) |
There was a problem hiding this comment.
This method should be annotated @GuardedBy("this"). See https://github.com/code-review-checklists/java-concurrency#guarded-by
| * Remove a segment dir from this location. The segment size is subtracted from currSize. | ||
| */ | ||
| public synchronized void addSegmentDir(File segmentDir, DataSegment segment) | ||
| public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) |
There was a problem hiding this comment.
Thanks for the explanation, sgtm 👍
|
@leventov do you have more comments on this pr? |
| return; | ||
| } | ||
| } | ||
| catch (Exception e) { |
There was a problem hiding this comment.
This is generally considered an antipattern to catch everything just in case unless the exception is rethrown in the catch block. Could catch only IOExceptions?
| // Current total size of files in bytes. All accesses must be synchronized with files. | ||
| private long currSize = 0; | ||
| /** | ||
| * Current total size of files in bytes. All accesses must be synchronized with files. |
There was a problem hiding this comment.
"All accesses must be synchronized with files." sentence is strange. It seems to me that it should be just removed, GuardedBy already says everything needed.
|
|
||
| // Set of files stored under the given path. All accesses must be synchronized with currSize. | ||
| /** | ||
| * Set of files stored under the given path. All accesses must be synchronized with currSizeBytes. |
There was a problem hiding this comment.
Same about "All accesses must be synchronized with currSizeBytes."
| private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); | ||
| private static final Comparator<StorageLocation> COMPARATOR = (left, right) -> | ||
| Longs.compare(right.available(), left.available()); | ||
| Longs.compare(right.availableSizeBytes(), left.availableSizeBytes()); |
There was a problem hiding this comment.
Comparator.comparingLong(StorageLocation::availableSizeBytes).reversed() would be clearer
| private static final Comparator<StorageLocation> COMPARATOR = (left, right) -> | ||
| Longs.compare(right.availableSizeBytes(), left.availableSizeBytes()); | ||
| private static final Comparator<StorageLocation> COMPARATOR = Comparator | ||
| .comparing(StorageLocation::availableSizeBytes) |
Description
Historicals can use multiple threads to load segments in parallel from deep storage now (#7088, #4966). This could lead an incorrect computation of remaining space in
StorageLocation.The current pattern to use
StorageLocationis:Even though each of
canHandle()andaddSegment()is synchronized, they are not atomically executed which could lead a wrong estimation of the available space.This PR fixes this problem to add a new method,
reserve(), toStorageLocation.reserve()basically does whatcanHandle()andaddSegment()atomically. If some error occurs afterreserve(), the caller should callremove(segment)to release the reserved space.This PR has: