From bc0a58790e81fce1e08886ff279d563f0ca20f14 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 7 Jan 2021 18:45:47 -0800 Subject: [PATCH 1/2] Add missing unit tests for segment loading in historicals --- .../SegmentLoaderLocalCacheManager.java | 9 ++- .../SegmentLoaderLocalCacheManagerTest.java | 59 ++++++++++++------ .../SegmentLoadDropHandlerTest.java | 61 ++++++++++++++++++- 3 files changed, 105 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index de9bff151693..16bf499a4070 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -31,6 +31,7 @@ import org.apache.druid.timeline.DataSegment; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Iterator; @@ -41,6 +42,9 @@ */ public class SegmentLoaderLocalCacheManager implements SegmentLoader { + @VisibleForTesting + static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker"; + private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); private final IndexIO indexIO; @@ -132,6 +136,7 @@ public boolean isSegmentLoaded(final DataSegment segment) return findStorageLocationIfLoaded(segment) != null; } + @Nullable private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) { for (StorageLocation location : locations) { @@ -167,7 +172,7 @@ private boolean checkSegmentFilesIntact(File dir) */ private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) { - final File downloadStartMarker = new File(localStorageDir.getPath(), "downloadStartMarker"); + final File downloadStartMarker = new File(localStorageDir.getPath(), DOWNLOAD_START_MARKER_FILE_NAME); return downloadStartMarker.exists(); } @@ -270,7 +275,7 @@ private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) { // We use a marker to prevent the case where a segment is downloaded, but before the download completes, // the parent directories of the segment are removed - final File downloadStartMarker = new File(storageDir, "downloadStartMarker"); + final File downloadStartMarker = new File(storageDir, DOWNLOAD_START_MARKER_FILE_NAME); synchronized (directoryWriteRemoveLock) { if (!storageDir.mkdirs()) { log.debug("Unable to make parent file[%s]", storageDir); diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index a01aef455406..75bbeff61c7a 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -28,9 +28,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; -import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; -import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -39,36 +36,22 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.io.File; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -@RunWith(Parameterized.class) public class SegmentLoaderLocalCacheManagerTest { - @Parameterized.Parameters - public static Collection constructorFeeder() - { - return ImmutableList.of( - new Object[] {TmpFileSegmentWriteOutMediumFactory.instance()}, - new Object[] {OffHeapMemorySegmentWriteOutMediumFactory.instance()} - ); - } - @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); private final ObjectMapper jsonMapper; - private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; private File localSegmentCacheFolder; private SegmentLoaderLocalCacheManager manager; - public SegmentLoaderLocalCacheManagerTest(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) + public SegmentLoaderLocalCacheManagerTest() { jsonMapper = new DefaultObjectMapper(); jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); @@ -78,7 +61,6 @@ public SegmentLoaderLocalCacheManagerTest(SegmentWriteOutMediumFactory segmentWr new LocalDataSegmentPuller() ) ); - this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; } @Before @@ -750,4 +732,43 @@ public void testSegmentDistributionUsingRandomStrategy() throws Exception Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3)); } + @Test + public void testGetSegmentFilesWhenDownloadStartMarkerExists() throws Exception + { + final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); + + final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + localStorageFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + + // manually create a local segment under localStorageFolder + final File localSegmentFile = new File( + localStorageFolder, + "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0" + ); + Assert.assertTrue(localSegmentFile.mkdirs()); + final File indexZip = new File(localSegmentFile, "index.zip"); + Assert.assertTrue(indexZip.createNewFile()); + + final File cachedSegmentDir = manager.getSegmentFiles(segmentToDownload); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + + // Emulate a corrupted segment file + final File downloadMarker = new File( + cachedSegmentDir, + SegmentLoaderLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME + ); + Assert.assertTrue(downloadMarker.createNewFile()); + + Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse(cachedSegmentDir.exists()); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 32368b700c68..8d11902758f9 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -29,15 +29,19 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.loading.CacheTestSegmentLoader; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus; +import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status; +import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; -import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -45,6 +49,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -92,6 +98,11 @@ public class SegmentLoadDropHandlerTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + public SegmentLoadDropHandlerTest() + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + } + @Before public void setUp() { @@ -234,7 +245,7 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) jsonMapper, segmentLoaderConfig, announcer, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), + Mockito.mock(DataSegmentServerAnnouncer.class), segmentManager, scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), new ServerTypeConfig(ServerType.HISTORICAL) @@ -449,7 +460,9 @@ public int getAnnounceIntervalMillis() return 50; } }, - announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager, + announcer, + Mockito.mock(DataSegmentServerAnnouncer.class), + segmentManager, new ServerTypeConfig(ServerType.HISTORICAL) ); @@ -522,4 +535,46 @@ public void testProcessBatch() throws Exception segmentLoadDropHandler.stop(); } + + @Test(timeout = 60_000L) + public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception + { + final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); + Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())) + .thenThrow(new RuntimeException("segment loading failure test")) + .thenReturn(true); + final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( + jsonMapper, + segmentLoaderConfig, + announcer, + Mockito.mock(DataSegmentServerAnnouncer.class), + segmentManager, + scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), + new ServerTypeConfig(ServerType.HISTORICAL) + ); + + segmentLoadDropHandler.start(); + + DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); + + List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); + + ListenableFuture> future = segmentLoadDropHandler + .processBatch(batch); + + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + List result = future.get(); + Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState()); + + future = segmentLoadDropHandler.processBatch(batch); + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + result = future.get(); + Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + + segmentLoadDropHandler.stop(); + } } From d4d72b208b3c6ef389338d84675ff10ee3248066 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 9 Jan 2021 12:30:18 -0800 Subject: [PATCH 2/2] unused import --- .../druid/server/coordination/SegmentLoadDropHandlerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 8d11902758f9..a9797846e64f 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -37,7 +37,6 @@ import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus; -import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status; import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment;