From 57a9af3c9997b628304af49c0b623141d81e4be0 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Wed, 2 Dec 2020 11:10:32 +0800 Subject: [PATCH 1/8] load segments with segment files check --- .../SegmentLoaderLocalCacheManager.java | 26 +++++++++++++++++- .../SegmentLoadDropHandlerTest.java | 27 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 80e8dc6015f2..579db466b204 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -119,6 +119,22 @@ private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) return null; } + /** + * check data intact. + * @param dir segments cache dir + * @return true means segment files may be damaged. + */ + private boolean checkSegmentFilesIntact(File dir) + { + return checkSegmentFilesIntactWithStartMarker(dir); + } + + private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) + { + final File downloadStartMarker = new File(localStorageDir.getPath(), "downloadStartMarker"); + return downloadStartMarker.exists(); + } + @Override public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException { @@ -157,7 +173,15 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException try { StorageLocation loc = findStorageLocationIfLoaded(segment); String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); - + if (loc != null) { + File localStorageDir = new File(loc.getPath(), storageDir); + if (checkSegmentFilesIntact(localStorageDir)) { + log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath()); + cleanupCacheFiles(loc.getPath(), localStorageDir); + loc.removeSegmentDir(localStorageDir, segment); + loc = null; + } + } if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 32368b700c68..98bdbbcfb779 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -314,6 +314,33 @@ Because another addSegment() call is executed, which removes the segment from se segmentLoadDropHandler.stop(); } + @Test + public void testSegmentLoading3() throws Exception + { + segmentLoadDropHandler.start(); + String dataSource = "test"; + String version = "1"; + String interval = "P1d/2011-04-01"; + + final DataSegment segment = makeSegment(dataSource, version, Intervals.of(interval)); + + // manually create a local segment under localStorageFolder + final File localSegmentFile = new File( + infoDir, + dataSource + interval + version + "/0" + ); + localSegmentFile.mkdirs(); + final File indexZip = new File(localSegmentFile, "index.zip"); + indexZip.createNewFile(); + final File downloadStartMarker = new File(localSegmentFile, "downloadStartMarker"); + downloadStartMarker.createNewFile(); + + segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); + Assert.assertFalse(downloadStartMarker.exists()); + + segmentLoadDropHandler.stop(); + } + @Test public void testLoadCache() throws Exception { From 06679f761d54eba00847980759afde11b4f8117c Mon Sep 17 00:00:00 2001 From: yuezhang Date: Wed, 2 Dec 2020 11:18:59 +0800 Subject: [PATCH 2/8] add more java docs --- .../segment/loading/SegmentLoaderLocalCacheManager.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 579db466b204..746f33c2fafd 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -129,6 +129,11 @@ private boolean checkSegmentFilesIntact(File dir) return checkSegmentFilesIntactWithStartMarker(dir); } + /** + * If there is 'downloadStartMarker' existed in localStorageDir, then the segments files may be damaged. + * Because each time, Druid will delete the 'downloadStartMarker' file after pulling and unzip the segments from DeepStorage. + * downloadStartMarker existed here may mean something error during download segments and the segment files may be damaged. + */ private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) { final File downloadStartMarker = new File(localStorageDir.getPath(), "downloadStartMarker"); From 90540f5e00dbaba3a9c29449e655ba1920965370 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 7 Dec 2020 17:34:26 +0800 Subject: [PATCH 3/8] done --- .../druid/server/coordination/SegmentLoadDropHandler.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index e9d2f075c34e..09cc26d7f22d 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -507,7 +507,10 @@ public ListenableFuture> processBatch(Li private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) { synchronized (requestStatusesLock) { - if (requestStatuses.getIfPresent(changeRequest) == null) { + AtomicReference status = requestStatuses.getIfPresent(changeRequest); + + // If last load/drop request status is failed, here can try that again + if (status == null || status.get().getState() == Status.STATE.FAILED) { changeRequest.go( new DataSegmentChangeHandler() { From b23a56803d563635b18acb733d49b72ccd591c56 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 7 Dec 2020 17:39:50 +0800 Subject: [PATCH 4/8] add java docs --- .idea/misc.xml | 4 ++-- .../segment/loading/SegmentLoaderLocalCacheManager.java | 8 +++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index bf2061d7392d..fe39ed623c9c 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -84,7 +84,7 @@ - + - + \ No newline at end of file diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 746f33c2fafd..4746af508ec5 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -130,7 +130,7 @@ private boolean checkSegmentFilesIntact(File dir) } /** - * If there is 'downloadStartMarker' existed in localStorageDir, then the segments files may be damaged. + * If there is 'downloadStartMarker' existed in localStorageDir, the segments files might be damaged. * Because each time, Druid will delete the 'downloadStartMarker' file after pulling and unzip the segments from DeepStorage. * downloadStartMarker existed here may mean something error during download segments and the segment files may be damaged. */ @@ -170,6 +170,12 @@ public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadi return factory.factorize(segment, segmentFiles, lazy); } + /** + * Make sure segments files in loc is intact, otherwise function like loadSegments will failed because of segment files is damaged. + * @param segment + * @return + * @throws SegmentLoadingException + */ @Override public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { From 89fac7c14d0a9d370ec6c547d9b5fb2b65d74e1e Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 7 Dec 2020 17:40:25 +0800 Subject: [PATCH 5/8] revert misc --- .idea/misc.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index fe39ed623c9c..bf2061d7392d 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -84,7 +84,7 @@ - + - \ No newline at end of file + From 5c745d601dd5491ffec721eff9d82dcbb9f3de2a Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 7 Dec 2020 19:43:43 +0800 Subject: [PATCH 6/8] resolve ci failures --- .../SegmentLoadDropHandlerTest.java | 31 +------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 98bdbbcfb779..486f8b528614 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -31,9 +31,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.loading.CacheTestSegmentLoader; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.segment.loading.*; import org.apache.druid.server.SegmentManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -314,33 +312,6 @@ Because another addSegment() call is executed, which removes the segment from se segmentLoadDropHandler.stop(); } - @Test - public void testSegmentLoading3() throws Exception - { - segmentLoadDropHandler.start(); - String dataSource = "test"; - String version = "1"; - String interval = "P1d/2011-04-01"; - - final DataSegment segment = makeSegment(dataSource, version, Intervals.of(interval)); - - // manually create a local segment under localStorageFolder - final File localSegmentFile = new File( - infoDir, - dataSource + interval + version + "/0" - ); - localSegmentFile.mkdirs(); - final File indexZip = new File(localSegmentFile, "index.zip"); - indexZip.createNewFile(); - final File downloadStartMarker = new File(localSegmentFile, "downloadStartMarker"); - downloadStartMarker.createNewFile(); - - segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); - Assert.assertFalse(downloadStartMarker.exists()); - - segmentLoadDropHandler.stop(); - } - @Test public void testLoadCache() throws Exception { From a77bf43f5e81b7b10f6aef43f4afa72435ecf480 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 7 Dec 2020 19:44:17 +0800 Subject: [PATCH 7/8] resolve ci failures --- .../druid/server/coordination/SegmentLoadDropHandlerTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 486f8b528614..32368b700c68 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -31,7 +31,9 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.loading.*; +import org.apache.druid.segment.loading.CacheTestSegmentLoader; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; From b73d2e780a7981407212d04367b524976afcc394 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 14 Dec 2020 22:50:12 +0800 Subject: [PATCH 8/8] done --- .../SegmentLoaderLocalCacheManager.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 4746af508ec5..20e8fac68493 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -113,7 +113,14 @@ private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) for (StorageLocation location : locations) { File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); if (localStorageDir.exists()) { - return location; + if (checkSegmentFilesIntact(localStorageDir)) { + log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath()); + cleanupCacheFiles(location.getPath(), localStorageDir); + location.removeSegmentDir(localStorageDir, segment); + break; + } else { + return location; + } } } return null; @@ -184,15 +191,7 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException try { StorageLocation loc = findStorageLocationIfLoaded(segment); String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); - if (loc != null) { - File localStorageDir = new File(loc.getPath(), storageDir); - if (checkSegmentFilesIntact(localStorageDir)) { - log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath()); - cleanupCacheFiles(loc.getPath(), localStorageDir); - loc.removeSegmentDir(localStorageDir, segment); - loc = null; - } - } + if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); }