From 475ee7b803b67cfb07469f84d5bb597b4e85faee Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 10 Aug 2018 18:16:35 -0700 Subject: [PATCH 1/5] Fix three bugs with segment publishing. 1. In AppenderatorImpl: always use a unique path if requested, even if the segment was already pushed. This is important because if we don't do this, it causes the issue mentioned in #6124. 2. In IndexerSQLMetadataStorageCoordinator: Fix a bug that could cause it to return a "not published" result instead of throwing an exception, when there was one metadata update failure, followed by some random exception. This is done by resetting the AtomicBoolean that tracks what case we're in, each time the callback runs. 3. In BaseAppenderatorDriver: Only kill segments if we get an affirmative false publish result. Skip killing if we just got some exception. The reason for this is that we want to avoid killing segments if they are in an unknown state. Two other changes to clarify the contracts a bit and hopefully prevent future bugs: 1. Return SegmentPublishResult from TransactionalSegmentPublisher, to make it more similar to announceHistoricalSegments. 2. Make it explicit, at multiple levels of javadocs, that a "false" publish result must indicate that the publish _definitely_ did not happen. Unknown states must be exceptions. This helps BaseAppenderatorDriver do the right thing. --- ...ementalPublishingKafkaIndexTaskRunner.java | 2 +- .../kafka/LegacyKafkaIndexTaskRunner.java | 2 +- .../AppenderatorDriverRealtimeIndexTask.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 2 +- .../SinglePhaseParallelIndexTaskRunner.java | 4 +- .../IndexerMetadataStorageCoordinator.java | 12 ++-- .../IndexerSQLMetadataStorageCoordinator.java | 63 +++++++++++-------- .../appenderator/AppenderatorImpl.java | 11 +++- .../appenderator/BaseAppenderatorDriver.java | 25 +++----- .../TransactionalSegmentPublisher.java | 9 ++- 10 files changed, 77 insertions(+), 55 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index a93fde611c5c..f8d2e10c338b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -1749,7 +1749,7 @@ TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTr log.info("Publishing with isTransaction[%s].", useTransaction); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index b7cdd3e96e44..a9dff63aefc7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -518,7 +518,7 @@ public void run() log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index c1ee50b23e9a..65be114bae62 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -297,7 +297,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; // Skip connecting firehose if we've been stopped before we got started. diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 6d4cfba4fb85..4b14f68e461f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -949,7 +949,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; try ( diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index ae1addfe63e9..8ad4ec28f00a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -382,7 +382,7 @@ private void publish(TaskToolbox toolbox) throws IOException { final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; final UsedSegmentChecker usedSegmentChecker = new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()); final Set segmentsToPublish = segmentsMap @@ -390,7 +390,7 @@ private void publish(TaskToolbox toolbox) throws IOException .stream() .flatMap(report -> report.getSegments().stream()) .collect(Collectors.toSet()); - final boolean published = publisher.publishSegments(segmentsToPublish, null); + final boolean published = publisher.publishSegments(segmentsToPublish, null).isSuccess(); if (published) { log.info("Published segments"); diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 0a7acaea1597..a90ca26bbf5d 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -50,13 +50,14 @@ default List getUsedSegmentsForInterval(String dataSource, Interval /** * Get all used segments and the created_date of these segments in a given datasource and interval - * + * * @param dataSource The datasource to query * @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive + * * @return The DataSegments and the related created_date of segments which include data in the requested interval */ List> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval); - + /** * Get all segments which may include any data in the interval and are flagged as used. * @@ -134,9 +135,12 @@ SegmentIdentifier allocatePendingSegment( * {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not * involve a metadata transaction * - * @return segment publish result indicating transaction success or failure, and set of segments actually published + * @return segment publish result indicating transaction success or failure, and set of segments actually published. + * This method must only return a failure code if it is sure that the transaction did not happen. If it is not sure, + * it must throw an exception instead. * * @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null + * @throws RuntimeException if the state of metadata storage after this call is unknown */ SegmentPublishResult announceHistoricalSegments( Set segments, @@ -177,7 +181,7 @@ SegmentPublishResult announceHistoricalSegments( * @return true if the entry was inserted, false otherwise */ boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata); - + void updateSegmentMetadata(Set segments); void deleteSegments(Set segments); diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 75e5d9a898f2..82b278787721 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -311,7 +311,7 @@ public SegmentPublishResult announceHistoricalSegments( } } - final AtomicBoolean txnFailure = new AtomicBoolean(false); + final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false); try { return connector.retryTransaction( @@ -323,6 +323,8 @@ public SegmentPublishResult inTransaction( final TransactionStatus transactionStatus ) throws Exception { + definitelyNotUpdated.set(false); + final Set inserted = Sets.newHashSet(); if (startMetadata != null) { @@ -334,8 +336,9 @@ public SegmentPublishResult inTransaction( ); if (result != DataSourceMetadataUpdateResult.SUCCESS) { + // Metadata was definitely not updated. transactionStatus.setRollbackOnly(); - txnFailure.set(true); + definitelyNotUpdated.set(true); if (result == DataSourceMetadataUpdateResult.FAILURE) { throw new RuntimeException("Aborting transaction!"); @@ -359,9 +362,10 @@ public SegmentPublishResult inTransaction( ); } catch (CallbackFailedException e) { - if (txnFailure.get()) { - return new SegmentPublishResult(ImmutableSet.of(), false); + if (definitelyNotUpdated.get()) { + return SegmentPublishResult.fail(); } else { + // Must throw exception if we are not sure if we updated or not. throw e; } } @@ -890,7 +894,12 @@ private byte[] getDataSourceMetadataWithHandleAsBytes( * @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with * {@link DataSourceMetadata#plus(DataSourceMetadata)} * - * @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata + * @return SUCCESS if dataSource metadata was updated from matching startMetadata to matching endMetadata, FAILURE or + * TRY_AGAIN if it definitely was not updated. This guarantee is meant to help + * {@link #announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)} + * achieve its own guarantee. + * + * @throws RuntimeException if state is unknown after this call */ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( final Handle handle, @@ -1163,29 +1172,31 @@ public List> getUsedSegmentAndCreatedDateForInterval(S handle -> handle.createQuery( StringUtils.format( "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource " + - "AND start >= :start AND %2$send%2$s <= :end AND used = true", + "AND start >= :start AND %2$send%2$s <= :end AND used = true", dbTables.getSegmentsTable(), connector.getQuoteString() ) ) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(new ResultSetMapper>() - { - @Override - public Pair map(int index, ResultSet r, StatementContext ctx) throws SQLException - { - try { - return new Pair<>( - jsonMapper.readValue(r.getBytes("payload"), DataSegment.class), - r.getString("created_date")); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }) - .list() + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .map(new ResultSetMapper>() + { + @Override + public Pair map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + try { + return new Pair<>( + jsonMapper.readValue(r.getBytes("payload"), DataSegment.class), + r.getString("created_date") + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }) + .list() ); } @@ -1197,7 +1208,7 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata me .createStatement( StringUtils.format( "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES" + - " (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", + " (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", dbTables.getDataSourceTable() ) ) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 5b79626880d2..4bb0857843b4 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -691,8 +691,15 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink try { if (descriptorFile.exists()) { // Already pushed. - log.info("Segment[%s] already pushed.", identifier); - return objectMapper.readValue(descriptorFile, DataSegment.class); + + if (useUniquePath) { + // Don't reuse the descriptor, because the caller asked for a unique path. Leave the old one as-is, since + // it might serve some unknown purpose. + log.info("Pushing segment[%s] again with new unique path.", identifier); + } else { + log.info("Segment[%s] already pushed.", identifier); + return objectMapper.readValue(descriptorFile, DataSegment.class); + } } log.info("Pushing merged index for segment[%s].", identifier); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index a5a6d2071d29..43ec063c3945 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -555,38 +555,33 @@ ListenableFuture publishInBackground( final boolean published = publisher.publishSegments( ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() - ); + ).isSuccess(); if (published) { log.info("Published segments."); } else { - log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); + log.info("Transaction failure while publishing segments, removing them from deep storage " + + "and checking if someone else beat us to publishing."); + + segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + final Set segmentsIdentifiers = segmentsAndMetadata .getSegments() .stream() .map(SegmentIdentifier::fromDataSegment) .collect(Collectors.toSet()); + if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers) .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { - log.info( - "Removing our segments from deep storage because someone else already published them: %s", - segmentsAndMetadata.getSegments() - ); - segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - log.info("Our segments really do exist, awaiting handoff."); } else { - throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments()); + throw new ISE("Failed to publish segments."); } } } catch (Exception e) { - log.warn( - "Removing segments from deep storage after failed publish: %s", - segmentsAndMetadata.getSegments() - ); - segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - + // Must not remove segments here, we aren't sure if our transaction succeeded or not. + log.warn(e, "Failed publish, not removing segments: %s", segmentsAndMetadata.getSegments()); throw Throwables.propagate(e); } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index d49b18162118..05220aa2c87e 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -19,6 +19,8 @@ package io.druid.segment.realtime.appenderator; +import io.druid.indexing.overlord.DataSourceMetadata; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -30,11 +32,14 @@ public interface TransactionalSegmentPublisher /** * Publish segments, along with some commit metadata, in a single transaction. * - * @return true if segments were published, false if they were not published due to txn failure with the metadata + * @return publish result that indicates if segments were published or not. If it is unclear + * if the segments were published or not, this method must throw an exception. The behavior is similar to + * {@link io.druid.metadata.IndexerSQLMetadataStorageCoordinator#announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}. * * @throws IOException if there was an I/O error when publishing + * @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason */ - boolean publishSegments( + SegmentPublishResult publishSegments( Set segments, @Nullable Object commitMetadata ) throws IOException; From 64c4da95f4ca198b1dbf6ecf295f039c4a0e5414 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 13 Aug 2018 15:26:28 -0700 Subject: [PATCH 2/5] Remove javadoc-only import. --- .../realtime/appenderator/TransactionalSegmentPublisher.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index 05220aa2c87e..9a30fbd2fcf5 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -19,7 +19,6 @@ package io.druid.segment.realtime.appenderator; -import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.timeline.DataSegment; @@ -34,7 +33,7 @@ public interface TransactionalSegmentPublisher * * @return publish result that indicates if segments were published or not. If it is unclear * if the segments were published or not, this method must throw an exception. The behavior is similar to - * {@link io.druid.metadata.IndexerSQLMetadataStorageCoordinator#announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}. + * IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments. * * @throws IOException if there was an I/O error when publishing * @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason From 0e09228492040b0d25a70e2d4bf7bcd20921e9de Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 14 Aug 2018 17:56:59 -0600 Subject: [PATCH 3/5] Updates. --- .../metadata/IndexerSQLMetadataStorageCoordinator.java | 1 + .../realtime/appenderator/StreamAppenderatorDriverTest.java | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 82b278787721..82e17a13f4d0 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -323,6 +323,7 @@ public SegmentPublishResult inTransaction( final TransactionStatus transactionStatus ) throws Exception { + // Set definitelyNotUpdated back to false upon retrying. definitelyNotUpdated.set(false); final Set inserted = Sets.newHashSet(); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index acdbed52d694..b9c5e223438f 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -30,6 +30,7 @@ import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; @@ -53,6 +54,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -359,7 +361,7 @@ private Set asIdentifiers(Iterable segments) static TransactionalSegmentPublisher makeOkPublisher() { - return (segments, commitMetadata) -> true; + return (segments, commitMetadata) -> new SegmentPublishResult(Collections.emptySet(), true); } static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) @@ -368,7 +370,7 @@ static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithExcept if (failWithException) { throw new RuntimeException("test"); } - return false; + return SegmentPublishResult.fail(); }; } From 77bd38f7de836ca20aff910ff643c60eacbf3973 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 14 Aug 2018 20:13:34 -0600 Subject: [PATCH 4/5] Fix test. --- .../realtime/appenderator/BatchAppenderatorDriverTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java index 4ac344831768..c472f8016215 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -24,13 +24,14 @@ import com.google.common.collect.ImmutableSet; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.granularity.Granularities; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence; -import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; import io.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -194,6 +195,6 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp static TransactionalSegmentPublisher makeOkPublisher() { - return (segments, commitMetadata) -> true; + return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true); } } From a6a939272b46ca22897786e1963c79825c97f69d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 15 Aug 2018 12:55:40 -0400 Subject: [PATCH 5/5] Fix tests. --- .../StreamAppenderatorDriverFailTest.java | 56 ++++++++++--------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 4d512e110804..1c7abb1f84e3 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -239,8 +239,7 @@ public void testFailDuringPublish() throws Exception { expectedException.expect(ExecutionException.class); expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); - expectedException.expectMessage( - "Failed to publish segments[[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', binaryVersion='0'}, DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', binaryVersion='0'}]]"); + expectedException.expectMessage("Failed to publish segments."); testFailDuringPublishInternal(false); } @@ -279,31 +278,34 @@ private void testFailDuringPublishInternal(boolean failWithException) throws Exc Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); } - dataSegmentKiller.killQuietly(new DataSegment( - "foo", - Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"), - "abc123", - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - new NumberedShardSpec(0, 0), - 0, - 0 - )); - EasyMock.expectLastCall().once(); - - dataSegmentKiller.killQuietly(new DataSegment( - "foo", - Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"), - "abc123", - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - new NumberedShardSpec(0, 0), - 0, - 0 - )); - EasyMock.expectLastCall().once(); + if (!failWithException) { + // Should only kill segments if there was _no_ exception. + dataSegmentKiller.killQuietly(new DataSegment( + "foo", + Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"), + "abc123", + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NumberedShardSpec(0, 0), + 0, + 0 + )); + EasyMock.expectLastCall().once(); + + dataSegmentKiller.killQuietly(new DataSegment( + "foo", + Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"), + "abc123", + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NumberedShardSpec(0, 0), + 0, + 0 + )); + EasyMock.expectLastCall().once(); + } EasyMock.replay(dataSegmentKiller);