From 5f90c56c0596bec9036cb11b6985cc983a59730c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 1 May 2018 16:54:25 -0700 Subject: [PATCH] [Backport] Fix Appenderator.push() to commit the metadata of all segments --- .../realtime/appenderator/Appenderator.java | 50 ++++++------------- .../appenderator/AppenderatorImpl.java | 15 ++---- .../appenderator/AppenderatorTest.java | 11 ++-- .../StreamAppenderatorDriverFailTest.java | 8 ++- 4 files changed, 29 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index 51708f4f31cd..7f06f86269e3 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -36,9 +36,9 @@ * both of those. It can also push data to deep storage. But, it does not decide which segments data should go into. * It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself! *

- * You can provide a {@link Committer} or a Supplier of one when you call one of the methods that adds, persists, or - * pushes data. The Committer should represent all data you have given to the Appenderator so far. This Committer will - * be used when that data has been persisted to disk. + * You can provide a {@link Committer} or a Supplier of one when you call one of the methods that {@link #add}, + * {@link #persistAll}, or {@link #push}. The Committer should represent all data you have given to the Appenderator so + * far. This Committer will be used when that data has been persisted to disk. */ public interface Appenderator extends QuerySegmentWalker, Closeable { @@ -73,8 +73,9 @@ default AppenderatorAddResult add(SegmentIdentifier identifier, InputRow row, Su * Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used * asynchronously. *

- * If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll, - * and push methods should all be called from the same thread to keep the metadata committed by Committer in sync. + * If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear}, + * {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata + * committed by Committer in sync. * * @param identifier the segment into which this row should be added * @param row the row to add @@ -128,8 +129,8 @@ AppenderatorAddResult add( * for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been * cleared. This may take some time, since all pending persists must finish first. *

- * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the - * metadata committed by Committer in sync. + * {@link #add}, {@link #clear}, {@link #persistAll}, and {@link #push} methods should all be called from the same + * thread to keep the metadata committed by Committer in sync. */ void clear() throws InterruptedException; @@ -147,50 +148,31 @@ AppenderatorAddResult add( */ ListenableFuture drop(SegmentIdentifier identifier); - /** - * Persist any in-memory indexed data for segments of the given identifiers to durable storage. This may be only - * somewhat durable, e.g. the machine's local disk. The Committer will be made synchronously with the call to - * persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with - * the data persisted to disk. - *

- * If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll, - * and push methods should all be called from the same thread to keep the metadata committed by Committer in sync. - * - * @param identifiers segment identifiers to be persisted - * @param committer a committer associated with all data that has been added to segments of the given identifiers so - * far - * - * @return future that resolves when all pending data to segments of the identifiers has been persisted, contains - * commit metadata for this persist - */ - ListenableFuture persist(Collection identifiers, @Nullable Committer committer); - /** * Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the * machine's local disk. The Committer will be made synchronously with the call to persistAll, but will actually * be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to * disk. *

- * If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll, - * and push methods should all be called from the same thread to keep the metadata committed by Committer in sync. + * If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear}, + * {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata + * committed by Committer in sync. * * @param committer a committer associated with all data that has been added so far * * @return future that resolves when all pending data has been persisted, contains commit metadata for this persist */ - default ListenableFuture persistAll(@Nullable Committer committer) - { - return persist(getSegments(), committer); - } + ListenableFuture persistAll(@Nullable Committer committer); /** * Merge and push particular segments to deep storage. This will trigger an implicit - * {@link #persist(Collection, Committer)} using the provided Committer. + * {@link #persistAll(Committer)} using the provided Committer. *

* After this method is called, you cannot add new data to any segments that were previously under construction. *

- * If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll, - * and push methods should all be called from the same thread to keep the metadata committed by Committer in sync. + * If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear}, + * {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata + * committed by Committer in sync. * * @param identifiers list of segments to push * @param committer a committer associated with all data that has been added so far diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 3d872e25e6af..70c22187d8dd 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -389,12 +389,12 @@ public ListenableFuture drop(final SegmentIdentifier identifier) } @Override - public ListenableFuture persist(Collection identifiers, @Nullable Committer committer) + public ListenableFuture persistAll(@Nullable final Committer committer) { final Map currentHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); int numPersistedRows = 0; - for (SegmentIdentifier identifier : identifiers) { + for (SegmentIdentifier identifier : sinks.keySet()) { final Sink sink = sinks.get(identifier); if (sink == null) { throw new ISE("No sink for identifier: %s", identifier); @@ -496,13 +496,6 @@ public Object doCall() return future; } - @Override - public ListenableFuture persistAll(@Nullable final Committer committer) - { - // Submit persistAll task to the persistExecutor - return persist(sinks.keySet(), committer); - } - @Override public ListenableFuture push( final Collection identifiers, @@ -521,7 +514,9 @@ public ListenableFuture push( } return Futures.transform( - persist(identifiers, committer), + // We should always persist all segments regardless of the input because metadata should be committed for all + // segments. + persistAll(committer), (Function) commitMetadata -> { final List dataSegments = Lists.newArrayList(); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java index 85bcd22ced33..17b32218fad9 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java @@ -191,10 +191,9 @@ public void run() Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier); Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); - appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get()); - Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); - appenderator.close(); + appenderator.persistAll(committerSupplier.get()); Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.close(); } } @@ -238,12 +237,12 @@ public void run() Assert.assertEquals(4, ((AppenderatorImpl) appenderator).getRowsInMemory()); appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier, false); Assert.assertEquals(5, ((AppenderatorImpl) appenderator).getRowsInMemory()); - appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get()); - Assert.assertEquals(3, ((AppenderatorImpl) appenderator).getRowsInMemory()); - appenderator.close(); + appenderator.persistAll(committerSupplier.get()); Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.close(); } } + @Test public void testRestoreFromDisk() throws Exception { diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index a89d9565c8c8..9535bb3868da 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -454,15 +454,13 @@ public ListenableFuture drop(SegmentIdentifier identifier) } @Override - public ListenableFuture persist( - Collection identifiers, Committer committer - ) + public ListenableFuture persistAll(Committer committer) { if (persistEnabled) { // do nothing return Futures.immediateFuture(committer.getMetadata()); } else { - return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", identifiers)); + return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", rows.keySet())); } } @@ -488,7 +486,7 @@ public ListenableFuture push( ) .collect(Collectors.toList()); return Futures.transform( - persist(identifiers, committer), + persistAll(committer), (Function) commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata) ); } else {