Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
* both of those. It can also push data to deep storage. But, it does not decide which segments data should go into.
* It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself!
* <p>
* You can provide a {@link Committer} or a Supplier of one when you call one of the methods that adds, persists, or
* pushes data. The Committer should represent all data you have given to the Appenderator so far. This Committer will
* be used when that data has been persisted to disk.
* You can provide a {@link Committer} or a Supplier of one when you call one of the methods that {@link #add},
* {@link #persistAll}, or {@link #push}. The Committer should represent all data you have given to the Appenderator so
* far. This Committer will be used when that data has been persisted to disk.
*/
public interface Appenderator extends QuerySegmentWalker, Closeable
{
Expand Down Expand Up @@ -73,8 +73,9 @@ default AppenderatorAddResult add(SegmentIdentifier identifier, InputRow row, Su
* Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used
* asynchronously.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
*
* @param identifier the segment into which this row should be added
* @param row the row to add
Expand Down Expand Up @@ -128,8 +129,8 @@ AppenderatorAddResult add(
* for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been
* cleared. This may take some time, since all pending persists must finish first.
* <p>
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
* {@link #add}, {@link #clear}, {@link #persistAll}, and {@link #push} methods should all be called from the same
* thread to keep the metadata committed by Committer in sync.
*/
void clear() throws InterruptedException;

Expand All @@ -147,50 +148,31 @@ AppenderatorAddResult add(
*/
ListenableFuture<?> drop(SegmentIdentifier identifier);

/**
* Persist any in-memory indexed data for segments of the given identifiers to durable storage. This may be only
* somewhat durable, e.g. the machine's local disk. The Committer will be made synchronously with the call to
* persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with
* the data persisted to disk.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
*
* @param identifiers segment identifiers to be persisted
* @param committer a committer associated with all data that has been added to segments of the given identifiers so
* far
*
* @return future that resolves when all pending data to segments of the identifiers has been persisted, contains
* commit metadata for this persist
*/
ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer);

/**
* Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the
* machine's local disk. The Committer will be made synchronously with the call to persistAll, but will actually
* be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to
* disk.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
*
* @param committer a committer associated with all data that has been added so far
*
* @return future that resolves when all pending data has been persisted, contains commit metadata for this persist
*/
default ListenableFuture<Object> persistAll(@Nullable Committer committer)
{
return persist(getSegments(), committer);
}
ListenableFuture<Object> persistAll(@Nullable Committer committer);

/**
* Merge and push particular segments to deep storage. This will trigger an implicit
* {@link #persist(Collection, Committer)} using the provided Committer.
* {@link #persistAll(Committer)} using the provided Committer.
* <p>
* After this method is called, you cannot add new data to any segments that were previously under construction.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
*
* @param identifiers list of segments to push
* @param committer a committer associated with all data that has been added so far
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,12 @@ public ListenableFuture<?> drop(final SegmentIdentifier identifier)
}

@Override
public ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer)
public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
{
final Map<String, Integer> currentHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
int numPersistedRows = 0;
for (SegmentIdentifier identifier : identifiers) {
for (SegmentIdentifier identifier : sinks.keySet()) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
throw new ISE("No sink for identifier: %s", identifier);
Expand Down Expand Up @@ -496,13 +496,6 @@ public Object doCall()
return future;
}

@Override
public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
{
// Submit persistAll task to the persistExecutor
return persist(sinks.keySet(), committer);
}

@Override
public ListenableFuture<SegmentsAndMetadata> push(
final Collection<SegmentIdentifier> identifiers,
Expand All @@ -521,7 +514,9 @@ public ListenableFuture<SegmentsAndMetadata> push(
}

return Futures.transform(
persist(identifiers, committer),
// We should always persist all segments regardless of the input because metadata should be committed for all
// segments.
persistAll(committer),
(Function<Object, SegmentsAndMetadata>) commitMetadata -> {
final List<DataSegment> dataSegments = Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,9 @@ public void run()
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get());
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
}
}

Expand Down Expand Up @@ -238,12 +237,12 @@ public void run()
Assert.assertEquals(4, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier, false);
Assert.assertEquals(5, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get());
Assert.assertEquals(3, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
}
}

@Test
public void testRestoreFromDisk() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,15 +454,13 @@ public ListenableFuture<?> drop(SegmentIdentifier identifier)
}

@Override
public ListenableFuture<Object> persist(
Collection<SegmentIdentifier> identifiers, Committer committer
)
public ListenableFuture<Object> persistAll(Committer committer)
{
if (persistEnabled) {
// do nothing
return Futures.immediateFuture(committer.getMetadata());
} else {
return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", identifiers));
return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", rows.keySet()));
}
}

Expand All @@ -488,7 +486,7 @@ public ListenableFuture<SegmentsAndMetadata> push(
)
.collect(Collectors.toList());
return Futures.transform(
persist(identifiers, committer),
persistAll(committer),
(Function<Object, SegmentsAndMetadata>) commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata)
);
} else {
Expand Down