Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ public void close()
}

// Future with cancel() implementation to remove it from "waitingFutures" list
private static class CustomSettableFuture extends AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
private class CustomSettableFuture extends AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
{
private final LinkedHashSet<CustomSettableFuture> waitingFutures;
private final Map<DataSegmentChangeRequest, AtomicReference<Status>> statusRefs;
Expand All @@ -789,15 +789,20 @@ private CustomSettableFuture(

public void resolve()
{
synchronized (statusRefs) {
synchronized (requestStatusesLock) {
if (isDone()) {
return;
}

List<DataSegmentChangeRequestAndStatus> result = new ArrayList<>(statusRefs.size());
statusRefs.forEach(
(request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get()))
);
final List<DataSegmentChangeRequestAndStatus> result = new ArrayList<>(statusRefs.size());
statusRefs.forEach((request, statusRef) -> {
// Remove complete statuses from the cache
final Status status = statusRef.get();
if (status != null && status.getState() != Status.STATE.PENDING) {
requestStatuses.invalidate(request);
}
result.add(new DataSegmentChangeRequestAndStatus(request, status));
});

set(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,12 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ
public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception
{
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()))
.thenReturn(true);
Mockito.doReturn(true).when(segmentManager).loadSegment(
ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
jsonMapper,
Expand All @@ -578,11 +582,11 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio

segmentLoadDropHandler.start();

DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));

List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));

// load the segment
// Request 1: Load the segment
ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler
.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
Expand All @@ -592,7 +596,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
scheduledRunnable.clear();

// drop the segment
// Request 2: Drop the segment
batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
future = segmentLoadDropHandler.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
Expand All @@ -603,23 +607,36 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio
scheduledRunnable.clear();

// check invocations after a load-drop sequence
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(
ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Mockito.verify(segmentManager, Mockito.times(1))
.dropSegment(ArgumentMatchers.any());

// try to reload the segment - this should be a no-op since it might be the case that this is the first load client
// with this request, we'll forget about the success of the load request
// Request 3: Reload the segment
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
future = segmentLoadDropHandler.processBatch(batch);
Assert.assertEquals(scheduledRunnable.size(), 0);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
result = future.get();
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
scheduledRunnable.clear();

// check invocations - should stay the same
Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
// check invocations - 1 more load has happened
Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(
ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Mockito.verify(segmentManager, Mockito.times(1))
.dropSegment(ArgumentMatchers.any());

// try to reload the segment - this time the loader will know that is a fresh request to load
// so, the segment manager will be asked to load
// Request 4: Try to reload the segment - segment is loaded again
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
future = segmentLoadDropHandler.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
Expand All @@ -630,8 +647,14 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio
scheduledRunnable.clear();

// check invocations - the load segment counter should bump up
Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any());
Mockito.verify(segmentManager, Mockito.times(3)).loadSegment(
ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Mockito.verify(segmentManager, Mockito.times(1))
.dropSegment(ArgumentMatchers.any());

segmentLoadDropHandler.stop();
}
Expand Down