diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 2aa203824883..de9bff151693 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -137,12 +137,40 @@ private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) for (StorageLocation location : locations) { File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); if (localStorageDir.exists()) { - return location; + if (checkSegmentFilesIntact(localStorageDir)) { + log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath()); + cleanupCacheFiles(location.getPath(), localStorageDir); + location.removeSegmentDir(localStorageDir, segment); + break; + } else { + return location; + } } } return null; } + /** + * check data intact. + * @param dir segments cache dir + * @return true means segment files may be damaged. + */ + private boolean checkSegmentFilesIntact(File dir) + { + return checkSegmentFilesIntactWithStartMarker(dir); + } + + /** + * If there is 'downloadStartMarker' existed in localStorageDir, the segments files might be damaged. + * Because each time, Druid will delete the 'downloadStartMarker' file after pulling and unzip the segments from DeepStorage. + * downloadStartMarker existed here may mean something error during download segments and the segment files may be damaged. + */ + private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) + { + final File downloadStartMarker = new File(localStorageDir.getPath(), "downloadStartMarker"); + return downloadStartMarker.exists(); + } + @Override public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException { @@ -173,6 +201,12 @@ public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadi return factory.factorize(segment, segmentFiles, lazy); } + /** + * Make sure segments files in loc is intact, otherwise function like loadSegments will failed because of segment files is damaged. + * @param segment + * @return + * @throws SegmentLoadingException + */ @Override public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 33d4df915140..0cb0ec0218b8 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -514,7 +514,10 @@ public ListenableFuture> processBatch(Li private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) { synchronized (requestStatusesLock) { - if (requestStatuses.getIfPresent(changeRequest) == null) { + AtomicReference status = requestStatuses.getIfPresent(changeRequest); + + // If last load/drop request status is failed, here can try that again + if (status == null || status.get().getState() == Status.STATE.FAILED) { changeRequest.go( new DataSegmentChangeHandler() {