Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
Expand All @@ -41,6 +42,9 @@
*/
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
@VisibleForTesting
static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker";

private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);

private final IndexIO indexIO;
Expand Down Expand Up @@ -132,6 +136,7 @@ public boolean isSegmentLoaded(final DataSegment segment)
return findStorageLocationIfLoaded(segment) != null;
}

@Nullable
private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : locations) {
Expand Down Expand Up @@ -167,7 +172,7 @@ private boolean checkSegmentFilesIntact(File dir)
*/
private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
{
final File downloadStartMarker = new File(localStorageDir.getPath(), "downloadStartMarker");
final File downloadStartMarker = new File(localStorageDir.getPath(), DOWNLOAD_START_MARKER_FILE_NAME);
return downloadStartMarker.exists();
}

Expand Down Expand Up @@ -270,7 +275,7 @@ private void loadInLocationWithStartMarker(DataSegment segment, File storageDir)
{
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
final File downloadStartMarker = new File(storageDir, DOWNLOAD_START_MARKER_FILE_NAME);
synchronized (directoryWriteRemoveLock) {
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
Expand All @@ -39,36 +36,22 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

@RunWith(Parameterized.class)
public class SegmentLoaderLocalCacheManagerTest
{
@Parameterized.Parameters
public static Collection<?> constructorFeeder()
{
return ImmutableList.of(
new Object[] {TmpFileSegmentWriteOutMediumFactory.instance()},
new Object[] {OffHeapMemorySegmentWriteOutMediumFactory.instance()}
);
}

@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();

private final ObjectMapper jsonMapper;
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;

private File localSegmentCacheFolder;
private SegmentLoaderLocalCacheManager manager;

public SegmentLoaderLocalCacheManagerTest(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
public SegmentLoaderLocalCacheManagerTest()
{
jsonMapper = new DefaultObjectMapper();
jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
Expand All @@ -78,7 +61,6 @@ public SegmentLoaderLocalCacheManagerTest(SegmentWriteOutMediumFactory segmentWr
new LocalDataSegmentPuller()
)
);
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
}

@Before
Expand Down Expand Up @@ -750,4 +732,43 @@ public void testSegmentDistributionUsingRandomStrategy() throws Exception
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3));
}

@Test
public void testGetSegmentFilesWhenDownloadStartMarkerExists() throws Exception
{
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");

final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
localStorageFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);

// manually create a local segment under localStorageFolder
final File localSegmentFile = new File(
localStorageFolder,
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
);
Assert.assertTrue(localSegmentFile.mkdirs());
final File indexZip = new File(localSegmentFile, "index.zip");
Assert.assertTrue(indexZip.createNewFile());

final File cachedSegmentDir = manager.getSegmentFiles(segmentToDownload);
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));

// Emulate a corrupted segment file
final File downloadMarker = new File(
cachedSegmentDir,
SegmentLoaderLocalCacheManager.DOWNLOAD_START_MARKER_FILE_NAME
);
Assert.assertTrue(downloadMarker.createNewFile());

Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentLoaded(segmentToDownload));
Assert.assertFalse(cachedSegmentDir.exists());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,27 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.loading.CacheTestSegmentLoader;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus;
import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -92,6 +97,11 @@ public class SegmentLoadDropHandlerTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

public SegmentLoadDropHandlerTest()
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
}

@Before
public void setUp()
{
Expand Down Expand Up @@ -234,7 +244,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
jsonMapper,
segmentLoaderConfig,
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
Mockito.mock(DataSegmentServerAnnouncer.class),
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
new ServerTypeConfig(ServerType.HISTORICAL)
Expand Down Expand Up @@ -449,7 +459,9 @@ public int getAnnounceIntervalMillis()
return 50;
}
},
announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager,
announcer,
Mockito.mock(DataSegmentServerAnnouncer.class),
segmentManager,
new ServerTypeConfig(ServerType.HISTORICAL)
);

Expand Down Expand Up @@ -522,4 +534,46 @@ public void testProcessBatch() throws Exception

segmentLoadDropHandler.stop();
}

@Test(timeout = 60_000L)
public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception
{
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean()))
.thenThrow(new RuntimeException("segment loading failure test"))
.thenReturn(true);
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
jsonMapper,
segmentLoaderConfig,
announcer,
Mockito.mock(DataSegmentServerAnnouncer.class),
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
new ServerTypeConfig(ServerType.HISTORICAL)
);

segmentLoadDropHandler.start();

DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));

List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));

ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler
.processBatch(batch);

for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
List<DataSegmentChangeRequestAndStatus> result = future.get();
Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState());

future = segmentLoadDropHandler.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
result = future.get();
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());

segmentLoadDropHandler.stop();
}
}