Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,40 @@ private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
if (localStorageDir.exists()) {
return location;
if (checkSegmentFilesIntact(localStorageDir)) {
log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath());
cleanupCacheFiles(location.getPath(), localStorageDir);
location.removeSegmentDir(localStorageDir, segment);
break;
} else {
return location;
}
}
}
return null;
}

/**
* check data intact.
* @param dir segments cache dir
* @return true means segment files may be damaged.
*/
private boolean checkSegmentFilesIntact(File dir)
{
return checkSegmentFilesIntactWithStartMarker(dir);
}

/**
* If there is 'downloadStartMarker' existed in localStorageDir, the segments files might be damaged.
* Because each time, Druid will delete the 'downloadStartMarker' file after pulling and unzip the segments from DeepStorage.
* downloadStartMarker existed here may mean something error during download segments and the segment files may be damaged.
*/
private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
{
final File downloadStartMarker = new File(localStorageDir.getPath(), "downloadStartMarker");
return downloadStartMarker.exists();
}

@Override
public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException
{
Expand Down Expand Up @@ -173,6 +201,12 @@ public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadi
return factory.factorize(segment, segmentFiles, lazy);
}

/**
* Make sure segments files in loc is intact, otherwise function like loadSegments will failed because of segment files is damaged.
* @param segment
* @return
* @throws SegmentLoadingException
*/
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,10 @@ public ListenableFuture<List<DataSegmentChangeRequestAndStatus>> processBatch(Li
private AtomicReference<Status> processRequest(DataSegmentChangeRequest changeRequest)
{
synchronized (requestStatusesLock) {
if (requestStatuses.getIfPresent(changeRequest) == null) {
AtomicReference<Status> status = requestStatuses.getIfPresent(changeRequest);

// If last load/drop request status is failed, here can try that again
if (status == null || status.get().getState() == Status.STATE.FAILED) {
changeRequest.go(
new DataSegmentChangeHandler()
{
Expand Down