diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index ca16a5f0f85b..0ce192c11b8e 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -122,7 +122,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| |`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| -|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| +|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. This option is deprecated. Use `completionTimeout` of KafkaSupervisorIOConfig instead.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)| |`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))| |`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))| diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 4e72e993c162..db8be91e57eb 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -120,6 +120,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |buildV9Directly|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|true|no| |forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no| |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| +|publishTimeout|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|0|no| #### IndexSpec diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index cb42ee57f3b7..07e0ae7464b4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -65,9 +65,9 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver; -import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.firehose.ChatHandler; @@ -92,11 +92,11 @@ import javax.ws.rs.core.Response; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -434,13 +434,20 @@ public void run() if (!ioConfig.getMinimumMessageTime().isPresent() || !ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) { - final SegmentIdentifier identifier = driver.add( + final String sequenceName = sequenceNames.get(record.partition()); + final AppenderatorDriverAddResult addResult = driver.add( row, - sequenceNames.get(record.partition()), + sequenceName, committerSupplier ); - if (identifier == null) { + if (addResult.isOk()) { + // If the number of rows in the segment exceeds the threshold after adding a row, + // move the segment out from the active segments of AppenderatorDriver to make a new segment. + if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) { + driver.moveSegmentOut(sequenceName, ImmutableList.of(addResult.getSegmentIdentifier())); + } + } else { // Failure to allocate segment puts determinism at risk, bail out to be safe. // May want configurable behavior here at some point. // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. @@ -491,48 +498,59 @@ public void run() status = Status.PUBLISHING; } - final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() - { - @Override - public boolean publishSegments(Set segments, Object commitMetadata) throws IOException - { - final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( - ((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS), - KafkaPartitions.class - ); + final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { + final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( + ((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS), + KafkaPartitions.class + ); - // Sanity check, we should only be publishing things that match our desired end state. - if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) { - throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); - } + // Sanity check, we should only be publishing things that match our desired end state. + if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) { + throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); + } - final SegmentTransactionalInsertAction action; + final SegmentTransactionalInsertAction action; - if (ioConfig.isUseTransaction()) { - action = new SegmentTransactionalInsertAction( - segments, - new KafkaDataSourceMetadata(ioConfig.getStartPartitions()), - new KafkaDataSourceMetadata(finalPartitions) - ); - } else { - action = new SegmentTransactionalInsertAction(segments, null, null); - } + if (ioConfig.isUseTransaction()) { + action = new SegmentTransactionalInsertAction( + segments, + new KafkaDataSourceMetadata(ioConfig.getStartPartitions()), + new KafkaDataSourceMetadata(finalPartitions) + ); + } else { + action = new SegmentTransactionalInsertAction(segments, null, null); + } - log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); + log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); - return toolbox.getTaskActionClient().submit(action).isSuccess(); - } + return toolbox.getTaskActionClient().submit(action).isSuccess(); }; - final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get()); - if (published == null) { + // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting + // for hand off. See KafkaSupervisorIOConfig.completionTimeout. + final SegmentsAndMetadata published = driver.publish( + publisher, + committerSupplier.get(), + sequenceNames.values() + ).get(); + + final SegmentsAndMetadata handedOff; + if (tuningConfig.getHandoffConditionTimeout() == 0) { + handedOff = driver.registerHandoff(published) + .get(); + } else { + handedOff = driver.registerHandoff(published) + .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); + } + + if (handedOff == null) { throw new ISE("Transaction failure publishing segments, aborting"); } else { log.info( "Published segments[%s] with metadata[%s].", Joiner.on(", ").join( Iterables.transform( - published.getSegments(), + handedOff.getSegments(), new Function() { @Override @@ -543,7 +561,7 @@ public String apply(DataSegment input) } ) ), - published.getCommitMetadata() + handedOff.getCommitMetadata() ); } } @@ -865,8 +883,6 @@ private FiniteAppenderatorDriver newDriver( toolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getObjectMapper(), - tuningConfig.getMaxRowsPerSegment(), - tuningConfig.getHandoffConditionTimeout(), metrics ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index fa721f2bd7aa..d257d7c98ec5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -42,6 +42,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private final IndexSpec indexSpec; private final boolean buildV9Directly; private final boolean reportParseExceptions; + @Deprecated private final long handoffConditionTimeout; private final boolean resetOffsetAutomatically; @@ -152,6 +153,7 @@ public boolean isReportParseExceptions() return reportParseExceptions; } + @Deprecated @JsonProperty public long getHandoffConditionTimeout() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 0e28d4abcb23..c04771a6ae69 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -44,7 +44,7 @@ public KafkaSupervisorTuningConfig( @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, - @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, + @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, @JsonProperty("workerThreads") Integer workerThreads, @JsonProperty("chatThreads") Integer chatThreads, @@ -62,6 +62,8 @@ public KafkaSupervisorTuningConfig( indexSpec, buildV9Directly, reportParseExceptions, + // Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of + // handoffConditionTimeout handoffConditionTimeout, resetOffsetAutomatically ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java index 70d2766ef81e..d42b44c5475d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -104,7 +104,18 @@ public void testSerdeWithNonDefaults() throws Exception @Test public void testCopyOf() throws Exception { - KafkaTuningConfig original = new KafkaTuningConfig(1, 2, new Period("PT3S"), new File("/tmp/xxx"), 4, new IndexSpec(), true, true, 5L, null); + KafkaTuningConfig original = new KafkaTuningConfig( + 1, + 2, + new Period("PT3S"), + new File("/tmp/xxx"), + 4, + new IndexSpec(), + true, + true, + 5L, + null + ); KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original); Assert.assertEquals(1, copy.getMaxRowsInMemory()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index a39a2238c79b..6a826d860545 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -30,6 +30,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; @@ -74,6 +75,7 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorConfig; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentAllocator; @@ -100,6 +102,9 @@ import java.util.Map; import java.util.Set; import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class IndexTask extends AbstractTask { @@ -451,9 +456,9 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin sequenceNameToShardSpecMap.put(sequenceName, shardSpecForPublishing); } - final SegmentIdentifier identifier = driver.add(inputRow, sequenceName, committerSupplier); + final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier); - if (identifier == null) { + if (!addResult.isOk()) { throw new ISE("Could not allocate segment for row with timestamp[%s]", inputRow.getTimestamp()); } @@ -482,7 +487,22 @@ public boolean publishSegments(Set segments, Object commitMetadata) } }; - final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get()); + final SegmentsAndMetadata published; + final long publishTimeout = ingestionSchema.getTuningConfig().getPublishTimeout(); + if (publishTimeout == 0) { + published = driver.publish( + publisher, + committerSupplier.get(), + sequenceNameToShardSpecMap.keySet() + ).get(); + } else { + published = driver.publish( + publisher, + committerSupplier.get(), + sequenceNameToShardSpecMap.keySet() + ).get(publishTimeout, TimeUnit.MILLISECONDS); + } + if (published == null) { log.error("Failed to publish segments, aborting!"); return false; @@ -505,6 +525,9 @@ public String apply(DataSegment input) return true; } } + catch (TimeoutException | ExecutionException e) { + throw Throwables.propagate(e); + } } private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox, DataSchema dataSchema) @@ -533,8 +556,6 @@ private FiniteAppenderatorDriver newDriver( new NoopSegmentHandoffNotifierFactory(), // don't wait for handoff since we don't serve queries new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getObjectMapper(), - Integer.MAX_VALUE, // rows for a partition is already determined by the shardSpec - 0, metrics ); } @@ -558,7 +579,7 @@ public IndexIngestionSpec( this.ioConfig = ioConfig; this.tuningConfig = tuningConfig == null ? - new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null) + new IndexTuningConfig(null, null, null, null, null, null, null, null, null, (File) null) : tuningConfig; } @@ -624,6 +645,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private static final boolean DEFAULT_BUILD_V9_DIRECTLY = true; private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false; private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; + private static final long DEFAULT_PUBLISH_TIMEOUT = 0; static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; @@ -636,6 +658,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final boolean buildV9Directly; private final boolean forceExtendableShardSpecs; private final boolean reportParseExceptions; + private final long publishTimeout; @JsonCreator public IndexTuningConfig( @@ -647,7 +670,8 @@ public IndexTuningConfig( @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs, - @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions + @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, + @JsonProperty("publishTimeout") @Nullable Long publishTimeout ) { this( @@ -659,6 +683,7 @@ public IndexTuningConfig( buildV9Directly, forceExtendableShardSpecs, reportParseExceptions, + publishTimeout, null ); } @@ -672,6 +697,7 @@ private IndexTuningConfig( @Nullable Boolean buildV9Directly, @Nullable Boolean forceExtendableShardSpecs, @Nullable Boolean reportParseExceptions, + @Nullable Long publishTimeout, @Nullable File basePersistDirectory ) { @@ -696,6 +722,7 @@ private IndexTuningConfig( this.reportParseExceptions = reportParseExceptions == null ? DEFAULT_REPORT_PARSE_EXCEPTIONS : reportParseExceptions; + this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; this.basePersistDirectory = basePersistDirectory; } @@ -710,6 +737,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir) buildV9Directly, forceExtendableShardSpecs, reportParseExceptions, + publishTimeout, dir ); } @@ -772,6 +800,12 @@ public boolean isForceExtendableShardSpecs() return forceExtendableShardSpecs; } + @JsonProperty + public long getPublishTimeout() + { + return publishTimeout; + } + @Override public Period getIntermediatePersistPeriod() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 94b3459f3921..282a6740cc10 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -606,7 +606,8 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( null, true, forceExtendableShardSpecs, - true + true, + null ) ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 515b3913f2f5..b74780fc38a3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -186,7 +186,7 @@ public void testIndexTaskSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true) + new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true, null) ), null, jsonMapper @@ -250,7 +250,7 @@ public void testIndexTaskwithResourceSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) ), null, jsonMapper diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 6fe8e11d95bf..39064903942d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -649,7 +649,7 @@ public void testIndexTask() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) ), null, MAPPER @@ -707,7 +707,7 @@ public void testIndexTaskFailure() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) ), null, MAPPER @@ -1066,7 +1066,7 @@ public void testResumeTasks() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null, null) ), null, MAPPER diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index b95139bc9b03..99b639da3685 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -133,7 +133,6 @@ public RealtimeTuningConfig( this.reportParseExceptions = reportParseExceptions == null ? defaultReportParseExceptions : reportParseExceptions; - this.handoffConditionTimeout = handoffConditionTimeout == null ? defaultHandoffConditionTimeout : handoffConditionTimeout; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index f869ab6894c7..abfdc9f828f3 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -27,6 +27,7 @@ import io.druid.segment.incremental.IndexSizeExceededException; import java.io.Closeable; +import java.util.Collection; import java.util.List; /** @@ -61,7 +62,7 @@ public interface Appenderator extends QuerySegmentWalker, Closeable * Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used * asynchronously. *

- * The add, clear, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread. * * @param identifier the segment into which this row should be added * @param row the row to add @@ -96,7 +97,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe * for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been * cleared. This may take some time, since all pending persists must finish first. * - * The add, clear, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread. */ void clear() throws InterruptedException; @@ -114,27 +115,47 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe */ ListenableFuture drop(SegmentIdentifier identifier); + /** + * Persist any in-memory indexed data for segments of the given identifiers to durable storage. This may be only + * somewhat durable, e.g. the machine's local disk. The Committer will be made synchronously with the call to + * persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with + * the data persisted to disk. + *

+ * The add, clear, persist, persistAll, and push methods should all be called from the same thread. + * + * @param identifiers segment identifiers to be persisted + * @param committer a committer associated with all data that has been added to segments of the given identifiers so + * far + * + * @return future that resolves when all pending data to segments of the identifiers has been persisted, contains + * commit metadata for this persist + */ + ListenableFuture persist(Collection identifiers, Committer committer); + /** * Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the - * machine's local disk. The Committer will be made synchronously will the call to persistAll, but will actually + * machine's local disk. The Committer will be made synchronously with the call to persistAll, but will actually * be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to * disk. *

- * The add, clear, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread. * * @param committer a committer associated with all data that has been added so far * * @return future that resolves when all pending data has been persisted, contains commit metadata for this persist */ - ListenableFuture persistAll(Committer committer); + default ListenableFuture persistAll(Committer committer) + { + return persist(getSegments(), committer); + } /** - * Merge and push particular segments to deep storage. This will trigger an implicit {@link #persistAll(Committer)} - * using the provided Committer. + * Merge and push particular segments to deep storage. This will trigger an implicit + * {@link #persist(Collection, Committer)} using the provided Committer. *

* After this method is called, you cannot add new data to any segments that were previously under construction. *

- * The add, clear, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread. * * @param identifiers list of segments to push * @param committer a committer associated with all data that has been added so far @@ -142,7 +163,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe * @return future that resolves when all segments have been pushed. The segment list will be the list of segments * that have been pushed and the commit metadata from the Committer. */ - ListenableFuture push(List identifiers, Committer committer); + ListenableFuture push(Collection identifiers, Committer committer); /** * Stop any currently-running processing and clean up after ourselves. This will not remove any on-disk persisted diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java new file mode 100644 index 000000000000..6d0ee1b5e581 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java @@ -0,0 +1,66 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.base.Supplier; +import io.druid.data.input.InputRow; + +import javax.annotation.Nullable; + +/** + * Result of {@link FiniteAppenderatorDriver#add(InputRow, String, Supplier)}. It contains the identifier of the + * segment which the InputRow is added to and the number of rows in that segment. + */ +public class AppenderatorDriverAddResult +{ + private final SegmentIdentifier segmentIdentifier; + private final int numRowsInSegment; + + public static AppenderatorDriverAddResult ok(SegmentIdentifier segmentIdentifier, int numRowsInSegment) + { + return new AppenderatorDriverAddResult(segmentIdentifier, numRowsInSegment); + } + + public static AppenderatorDriverAddResult fail() + { + return new AppenderatorDriverAddResult(null, 0); + } + + private AppenderatorDriverAddResult(@Nullable SegmentIdentifier segmentIdentifier, int numRowsInSegment) + { + this.segmentIdentifier = segmentIdentifier; + this.numRowsInSegment = numRowsInSegment; + } + + public boolean isOk() + { + return segmentIdentifier != null; + } + + public SegmentIdentifier getSegmentIdentifier() + { + return segmentIdentifier; + } + + public int getNumRowsInSegment() + { + return numRowsInSegment; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 1c3e2794b94d..c2bfdf1e9b93 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -80,6 +80,7 @@ import java.nio.channels.FileLock; import java.nio.file.StandardOpenOption; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -346,13 +347,10 @@ public ListenableFuture drop(final SegmentIdentifier identifier) } @Override - public ListenableFuture persistAll(final Committer committer) + public ListenableFuture persist(Collection identifiers, Committer committer) { - // Submit persistAll task to the persistExecutor - final Map commitHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); - final Set identifiers = sinks.keySet(); for (SegmentIdentifier identifier : identifiers) { final Sink sink = sinks.get(identifier); final List hydrants = Lists.newArrayList(sink); @@ -437,9 +435,16 @@ public String apply(Map.Entry entry) return future; } + @Override + public ListenableFuture persistAll(final Committer committer) + { + // Submit persistAll task to the persistExecutor + return persist(sinks.keySet(), committer); + } + @Override public ListenableFuture push( - final List identifiers, + final Collection identifiers, final Committer committer ) { @@ -454,7 +459,7 @@ public ListenableFuture push( } return Futures.transform( - persistAll(committer), + persist(identifiers, committer), new Function() { @Override diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java index 8119f81f06ae..10c9c4cdb388 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -34,8 +33,10 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; - +import com.google.common.util.concurrent.SettableFuture; +import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; @@ -44,17 +45,23 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; -import io.druid.timeline.DataSegment; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * A FiniteAppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you @@ -76,20 +83,23 @@ public class FiniteAppenderatorDriver implements Closeable private final SegmentHandoffNotifier handoffNotifier; private final UsedSegmentChecker usedSegmentChecker; private final ObjectMapper objectMapper; - private final int maxRowsPerSegment; - private final long handoffConditionTimeout; private final FireDepartmentMetrics metrics; - // All access to "activeSegments" and "lastSegmentId" must be synchronized on "activeSegments". + // All access to "activeSegments", "publishPendingSegments", and "lastSegmentId" must be synchronized on + // "activeSegments". // sequenceName -> start of segment interval -> segment we're currently adding data to private final Map> activeSegments = new TreeMap<>(); + // sequenceName -> list of identifiers of segments waiting for being published + // publishPendingSegments is always a super set of activeSegments because there can be some segments to which data + // are not added anymore, but not published yet. + private final Map> publishPendingSegments = new HashMap<>(); + // sequenceName -> most recently allocated segment private final Map lastSegmentIds = Maps.newHashMap(); - // Notified when segments are dropped. - private final Object handoffMonitor = new Object(); + private final ListeningExecutorService publishExecutor; /** * Create a driver. @@ -99,9 +109,6 @@ public class FiniteAppenderatorDriver implements Closeable * @param handoffNotifierFactory handoff notifier factory * @param usedSegmentChecker used segment checker * @param objectMapper object mapper, used for serde of commit metadata - * @param maxRowsPerSegment maximum number of rows allowed in an entire segment (not a single persist) - * @param handoffConditionTimeout maximum number of millis allowed for handoff (not counting push/publish), zero - * means wait forever. * @param metrics Firedepartment metrics */ public FiniteAppenderatorDriver( @@ -110,8 +117,6 @@ public FiniteAppenderatorDriver( SegmentHandoffNotifierFactory handoffNotifierFactory, UsedSegmentChecker usedSegmentChecker, ObjectMapper objectMapper, - int maxRowsPerSegment, - long handoffConditionTimeout, FireDepartmentMetrics metrics ) { @@ -121,9 +126,8 @@ public FiniteAppenderatorDriver( .createSegmentHandoffNotifier(appenderator.getDataSource()); this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); - this.maxRowsPerSegment = maxRowsPerSegment; - this.handoffConditionTimeout = handoffConditionTimeout; this.metrics = Preconditions.checkNotNull(metrics, "metrics"); + this.publishExecutor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d")); } /** @@ -151,13 +155,14 @@ public Object startJob() final String sequenceName = entry.getKey(); final TreeMap segmentMap = Maps.newTreeMap(); - lastSegmentIds.put(sequenceName, metadata.getLastSegmentIds().get(sequenceName)); activeSegments.put(sequenceName, segmentMap); for (SegmentIdentifier identifier : entry.getValue()) { segmentMap.put(identifier.getInterval().getStartMillis(), identifier); } } + publishPendingSegments.putAll(metadata.getPublishPendingSegments()); + lastSegmentIds.putAll(metadata.getLastSegmentIds()); } return metadata.getCallerMetadata(); @@ -166,6 +171,18 @@ public Object startJob() } } + private void addSegment(String sequenceName, SegmentIdentifier identifier) + { + synchronized (activeSegments) { + activeSegments.computeIfAbsent(sequenceName, k -> new TreeMap<>()) + .putIfAbsent(identifier.getInterval().getStartMillis(), identifier); + + publishPendingSegments.computeIfAbsent(sequenceName, k -> new ArrayList<>()) + .add(identifier); + lastSegmentIds.put(sequenceName, identifier.getIdentifierAsString()); + } + } + /** * Clears out all our state and also calls {@link Appenderator#clear()} on the underlying Appenderator. */ @@ -188,7 +205,7 @@ public void clear() throws InterruptedException * * @throws IOException if there is an I/O error while allocating or writing to a segment */ - public SegmentIdentifier add( + public AppenderatorDriverAddResult add( final InputRow row, final String sequenceName, final Supplier committerSupplier @@ -203,16 +220,14 @@ public SegmentIdentifier add( if (identifier != null) { try { final int numRows = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier)); - if (numRows >= maxRowsPerSegment) { - moveSegmentOut(sequenceName, ImmutableList.of(identifier)); - } + return AppenderatorDriverAddResult.ok(identifier, numRows); } catch (SegmentNotWritableException e) { throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier); } + } else { + return AppenderatorDriverAddResult.fail(); } - - return identifier; } /** @@ -242,59 +257,87 @@ public Object persist(final Committer committer) throws InterruptedException } /** - * Publish all data indexed through this driver so far, and waits for it to be handed off. Blocks until complete. - * Retries forever on transient failures, but may exit early on permanent failures. + * Register the segments in the given {@link SegmentsAndMetadata} to be handed off and execute a background task which + * waits until the hand off completes. * - * Should be called after all data has been added and persisted through {@link #add(InputRow, String, Supplier)} and - * {@link #persist(Committer)}. - * - * @param publisher publisher to use for this set of segments - * @param committer committer representing all data that has been added so far + * @param segmentsAndMetadata the result segments and metadata of + * {@link #publish(TransactionalSegmentPublisher, Committer, Collection)} * - * @return segments and metadata published if successful, or null if segments could not be handed off due to - * transaction failure with commit metadata. + * @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task + * which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata + * of the caller of {@link FiniteAppenderatorDriverMetadata} */ - public SegmentsAndMetadata finish( - final TransactionalSegmentPublisher publisher, - final Committer committer - ) throws InterruptedException + public ListenableFuture registerHandoff(SegmentsAndMetadata segmentsAndMetadata) { - final SegmentsAndMetadata segmentsAndMetadata = publishAll(publisher, wrapCommitter(committer)); - - if (segmentsAndMetadata != null) { - final long giveUpAt = handoffConditionTimeout > 0 - ? System.currentTimeMillis() + handoffConditionTimeout - : 0; + if (segmentsAndMetadata == null) { + return Futures.immediateFuture(null); - log.info("Awaiting handoff of segments: [%s]", Joiner.on(", ").join(appenderator.getSegments())); + } else { + final List waitingSegmentIdList = segmentsAndMetadata.getSegments().stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toList()); + + if (waitingSegmentIdList.isEmpty()) { + return Futures.immediateFuture( + new SegmentsAndMetadata( + segmentsAndMetadata.getSegments(), + ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + .getCallerMetadata() + ) + ); + } - synchronized (handoffMonitor) { - while (!appenderator.getSegments().isEmpty()) { + log.info("Register handoff of segments: [%s]", waitingSegmentIdList); + + final SettableFuture resultFuture = SettableFuture.create(); + final AtomicInteger numRemainingHandoffSegments = new AtomicInteger(waitingSegmentIdList.size()); + + for (final SegmentIdentifier segmentIdentifier : waitingSegmentIdList) { + handoffNotifier.registerSegmentHandoffCallback( + new SegmentDescriptor( + segmentIdentifier.getInterval(), + segmentIdentifier.getVersion(), + segmentIdentifier.getShardSpec().getPartitionNum() + ), + MoreExecutors.sameThreadExecutor(), + () -> { + log.info("Segment[%s] successfully handed off, dropping.", segmentIdentifier); + metrics.incrementHandOffCount(); + + final ListenableFuture dropFuture = appenderator.drop(segmentIdentifier); + Futures.addCallback( + dropFuture, + new FutureCallback() + { + @Override + public void onSuccess(Object result) + { + if (numRemainingHandoffSegments.decrementAndGet() == 0) { + log.info("All segments handed off."); + resultFuture.set( + new SegmentsAndMetadata( + segmentsAndMetadata.getSegments(), + ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + .getCallerMetadata() + ) + ); + } + } - if (giveUpAt == 0) { - handoffMonitor.wait(); - } else { - final long remaining = giveUpAt - System.currentTimeMillis(); - if (remaining > 0) { - handoffMonitor.wait(remaining); - } else { - throw new ISE( - "Segment handoff wait timeout. Segments not yet handed off: [%s]", - Joiner.on(", ").join(appenderator.getSegments()) + @Override + public void onFailure(Throwable e) + { + log.warn(e, "Failed to drop segment[%s]?!", segmentIdentifier); + numRemainingHandoffSegments.decrementAndGet(); + resultFuture.setException(e); + } + } ); } - } - } + ); } - log.info("All segments handed off."); - - return new SegmentsAndMetadata( - segmentsAndMetadata.getSegments(), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() - ); - } else { - return null; + return resultFuture; } } @@ -304,6 +347,7 @@ public SegmentsAndMetadata finish( @Override public void close() { + publishExecutor.shutdownNow(); handoffNotifier.close(); } @@ -343,7 +387,6 @@ private SegmentIdentifier getSegment(final DateTime timestamp, final String sequ return existing; } else { // Allocate new segment. - final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); final SegmentIdentifier newSegment = segmentAllocator.allocate( timestamp, sequenceName, @@ -351,8 +394,6 @@ private SegmentIdentifier getSegment(final DateTime timestamp, final String sequ ); if (newSegment != null) { - final Long key = newSegment.getInterval().getStartMillis(); - for (SegmentIdentifier identifier : appenderator.getSegments()) { if (identifier.equals(newSegment)) { throw new ISE( @@ -364,11 +405,7 @@ private SegmentIdentifier getSegment(final DateTime timestamp, final String sequ } log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName); - if (activeSegmentsForSequence == null) { - activeSegments.put(sequenceName, Maps.newTreeMap()); - } - activeSegments.get(sequenceName).put(key, newSegment); - lastSegmentIds.put(sequenceName, newSegment.getIdentifierAsString()); + addSegment(sequenceName, newSegment); } else { // Well, we tried. log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName); @@ -382,7 +419,7 @@ private SegmentIdentifier getSegment(final DateTime timestamp, final String sequ /** * Move a set of identifiers out from "active", making way for newer segments. */ - private void moveSegmentOut(final String sequenceName, final List identifiers) + public void moveSegmentOut(final String sequenceName, final List identifiers) { synchronized (activeSegments) { final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); @@ -393,7 +430,7 @@ private void moveSegmentOut(final String sequenceName, final List publish( final TransactionalSegmentPublisher publisher, - final Committer wrappedCommitter - ) throws InterruptedException + final Committer committer, + final Collection sequenceNames + ) { - final List theSegments = ImmutableList.copyOf(appenderator.getSegments()); - - long nTry = 0; - while (true) { - try { - log.info("Pushing segments: [%s]", Joiner.on(", ").join(theSegments)); - final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(theSegments, wrappedCommitter).get(); - - // Sanity check - if (!segmentsToIdentifiers(segmentsAndMetadata.getSegments()).equals(Sets.newHashSet(theSegments))) { - throw new ISE( - "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", - Joiner.on(", ").join(identifiersToStrings(segmentsToIdentifiers(segmentsAndMetadata.getSegments()))), - Joiner.on(", ").join(identifiersToStrings(theSegments)) - ); - } + final List theSegments; + synchronized (activeSegments) { + theSegments = sequenceNames.stream() + .map(publishPendingSegments::get) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } - log.info( - "Publishing segments with commitMetadata[%s]: [%s]", - segmentsAndMetadata.getCommitMetadata(), - Joiner.on(", ").join(segmentsAndMetadata.getSegments()) - ); + final ListenableFuture publishFuture = publish( + publisher, + wrapCommitter(committer), + theSegments + ); - if (segmentsAndMetadata.getSegments().isEmpty()) { - log.info("Nothing to publish, skipping publish step."); - } else { - final boolean published = publisher.publishSegments( - ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() - ); - - if (published) { - log.info("Published segments, awaiting handoff."); - } else { - log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); - if (usedSegmentChecker.findUsedSegments(segmentsToIdentifiers(segmentsAndMetadata.getSegments())) - .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { - log.info("Our segments really do exist, awaiting handoff."); - } else { - log.warn("Our segments don't exist, giving up."); - return null; + Futures.addCallback( + publishFuture, + new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata result) + { + if (result != null) { + synchronized (activeSegments) { + // Remove sequenceName from both publishPendingSemgments and activeSegments + sequenceNames.forEach( + sequenceName -> { + activeSegments.remove(sequenceName); + publishPendingSegments.remove(sequenceName); + } + ); + } } } + + @Override + public void onFailure(Throwable t) + { + // The throwable is propagated anyway when get() is called on the future. + // See FiniteAppenderatorFailTest.testInterruptDuringPush(). + log.error(t, "Failed to publish segments[%s]", theSegments); + } } + ); - for (final DataSegment dataSegment : segmentsAndMetadata.getSegments()) { - handoffNotifier.registerSegmentHandoffCallback( - new SegmentDescriptor( - dataSegment.getInterval(), - dataSegment.getVersion(), - dataSegment.getShardSpec().getPartitionNum() - ), - MoreExecutors.sameThreadExecutor(), - new Runnable() - { - @Override - public void run() - { - final SegmentIdentifier identifier = SegmentIdentifier.fromDataSegment(dataSegment); - log.info("Segment[%s] successfully handed off, dropping.", identifier); - metrics.incrementHandOffCount(); - final ListenableFuture dropFuture = appenderator.drop(identifier); - Futures.addCallback( - dropFuture, - new FutureCallback() - { - @Override - public void onSuccess(Object result) - { - synchronized (handoffMonitor) { - handoffMonitor.notifyAll(); - } - } - - @Override - public void onFailure(Throwable e) - { - log.warn(e, "Failed to drop segment[%s]?!"); - synchronized (handoffMonitor) { - handoffMonitor.notifyAll(); - } - } - } - ); + return publishFuture; + } + + /** + * Execute a task in background to publish the given segments. The task blocks until complete. + * Retries forever on transient failures, but may exit early on permanent failures. + * + * Should be called after all data has been added through {@link #add(InputRow, String, Supplier)}. + * + * @param publisher publisher to use for this set of segments + * @param wrappedCommitter committer representing all data that has been added so far + * + * @return segments and metadata published if successful, or null if segments could not be handed off due to + * transaction failure with commit metadata. + */ + private ListenableFuture publish( + final TransactionalSegmentPublisher publisher, + final WrappedCommitter wrappedCommitter, + final List segmentIdentifiers + ) + { + return publishExecutor.submit( + () -> { + long nTry = 0; + while (true) { + try { + log.info("Pushing segments: [%s]", Joiner.on(", ").join(segmentIdentifiers)); + final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(segmentIdentifiers, wrappedCommitter) + .get(); + + // Sanity check + final Set pushedSegments = segmentsAndMetadata.getSegments().stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toSet()); + if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) { + throw new ISE( + "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", + pushedSegments, + segmentIdentifiers + ); + } + + log.info( + "Publishing segments with commitMetadata[%s]: [%s]", + segmentsAndMetadata.getCommitMetadata(), + Joiner.on(", ").join(segmentsAndMetadata.getSegments()) + ); + + if (segmentsAndMetadata.getSegments().isEmpty()) { + log.info("Nothing to publish, skipping publish step."); + } else { + final boolean published = publisher.publishSegments( + ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), + ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() + ); + + if (published) { + log.info("Published segments, awaiting handoff."); + } else { + log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); + if (usedSegmentChecker.findUsedSegments(pushedSegments) + .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { + log.info("Our segments really do exist, awaiting handoff."); + } else { + log.warn("Our segments don't exist, giving up."); + return null; + } } } - ); + + return segmentsAndMetadata; + } + catch (InterruptedException e) { + throw e; + } + catch (Exception e) { + final long sleepMillis = computeNextRetrySleep(++nTry); + log.warn(e, "Failed publish (try %d), retrying in %,dms.", nTry, sleepMillis); + Thread.sleep(sleepMillis); + } + } } + ); + } - return segmentsAndMetadata; - } - catch (InterruptedException e) { - throw e; - } - catch (Exception e) { - final long sleepMillis = computeNextRetrySleep(++nTry); - log.warn(e, "Failed publishAll (try %d), retrying in %,dms.", nTry, sleepMillis); - Thread.sleep(sleepMillis); - } - } + public ListenableFuture publishAndRegisterHandoff( + final TransactionalSegmentPublisher publisher, + final Committer committer, + final Collection sequenceNames + ) + { + return Futures.transform( + publish(publisher, committer, sequenceNames), + this::registerHandoff + ); + } + + private interface WrappedCommitter extends Committer + { } private Supplier wrapCommitterSupplier(final Supplier committerSupplier) { - return new Supplier() - { - @Override - public Committer get() - { - return wrapCommitter(committerSupplier.get()); - } - }; + return () -> wrapCommitter(committerSupplier.get()); } - private Committer wrapCommitter(final Committer committer) + private WrappedCommitter wrapCommitter(final Committer committer) { + final FiniteAppenderatorDriverMetadata wrappedMetadata; synchronized (activeSegments) { - final FiniteAppenderatorDriverMetadata wrappedMetadata = new FiniteAppenderatorDriverMetadata( + wrappedMetadata = new FiniteAppenderatorDriverMetadata( ImmutableMap.copyOf( Maps.transformValues( activeSegments, @@ -545,25 +623,26 @@ public List apply(NavigableMap input } ) ), + ImmutableMap.copyOf(publishPendingSegments), ImmutableMap.copyOf(lastSegmentIds), committer.getMetadata() ); + } - return new Committer() + return new WrappedCommitter() + { + @Override + public Object getMetadata() { - @Override - public Object getMetadata() - { - return wrappedMetadata; - } + return wrappedMetadata; + } - @Override - public void run() - { - committer.run(); - } - }; - } + @Override + public void run() + { + committer.run(); + } + }; } private static long computeNextRetrySleep(final long nTry) @@ -573,34 +652,4 @@ private static long computeNextRetrySleep(final long nTry) final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * new Random().nextGaussian(), 0), 2); return (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry)) * fuzzyMultiplier); } - - private static Set segmentsToIdentifiers(final Iterable segments) - { - return FluentIterable.from(segments) - .transform( - new Function() - { - @Override - public SegmentIdentifier apply(DataSegment segment) - { - return SegmentIdentifier.fromDataSegment(segment); - } - } - ).toSet(); - } - - private static Iterable identifiersToStrings(final Iterable identifiers) - { - return FluentIterable.from(identifiers) - .transform( - new Function() - { - @Override - public String apply(SegmentIdentifier input) - { - return input.getIdentifierAsString(); - } - } - ); - } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java index 65c7e70c7888..f38229da45e3 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java @@ -28,17 +28,20 @@ public class FiniteAppenderatorDriverMetadata { private final Map> activeSegments; + private final Map> publishPendingSegments; private final Map lastSegmentIds; private final Object callerMetadata; @JsonCreator public FiniteAppenderatorDriverMetadata( @JsonProperty("activeSegments") Map> activeSegments, + @JsonProperty("publishPendingSegments") Map> publishPendingSegments, @JsonProperty("lastSegmentIds") Map lastSegmentIds, @JsonProperty("callerMetadata") Object callerMetadata ) { this.activeSegments = activeSegments; + this.publishPendingSegments = publishPendingSegments; this.lastSegmentIds = lastSegmentIds; this.callerMetadata = callerMetadata; } @@ -49,6 +52,12 @@ public Map> getActiveSegments() return activeSegments; } + @JsonProperty + public Map> getPublishPendingSegments() + { + return publishPendingSegments; + } + @JsonProperty public Map getLastSegmentIds() { @@ -66,6 +75,7 @@ public String toString() { return "FiniteAppenderatorDriverMetadata{" + "activeSegments=" + activeSegments + + ", publishPendingSegments=" + publishPendingSegments + ", lastSegmentIds=" + lastSegmentIds + ", callerMetadata=" + callerMetadata + '}'; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java new file mode 100644 index 000000000000..489a1ed2864c --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java @@ -0,0 +1,459 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.data.input.Committer; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.SegmentDescriptor; +import io.druid.segment.incremental.IndexSizeExceededException; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestCommitterSupplier; +import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestSegmentAllocator; +import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestSegmentHandoffNotifierFactory; +import io.druid.timeline.DataSegment; +import org.hamcrest.CoreMatchers; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public class FiniteAppenderatorDriverFailTest +{ + private static final String DATA_SOURCE = "foo"; + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); + private static final long PUBLISH_TIMEOUT = 1000; + + private static final List ROWS = ImmutableList.of( + new MapBasedInputRow( + new DateTime("2000"), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "1") + ), + new MapBasedInputRow( + new DateTime("2000T01"), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", 2.0) + ), + new MapBasedInputRow( + new DateTime("2000T01"), + ImmutableList.of("dim2"), + ImmutableMap.of("dim2", "bar", "met1", 2.0) + ) + ); + + SegmentAllocator allocator; + TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; + FiniteAppenderatorDriver driver; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() + { + allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); + } + + @After + public void tearDown() throws Exception + { + if (driver != null) { + driver.clear(); + driver.close(); + } + } + + @Test + public void testFailDuringPersist() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + expectedException.expect(TimeoutException.class); + + driver = new FiniteAppenderatorDriver( + createPersistFailAppenderator(), + allocator, + segmentHandoffNotifierFactory, + new NoopUsedSegmentChecker(), + OBJECT_MAPPER, + new FireDepartmentMetrics() + ); + + driver.startJob(); + + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.setHandoffDelay(100); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + driver.publish( + FiniteAppenderatorDriverTest.makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + } + + @Test + public void testInterruptDuringPush() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class)); + + driver = new FiniteAppenderatorDriver( + createPushInterruptAppenderator(), + allocator, + segmentHandoffNotifierFactory, + new NoopUsedSegmentChecker(), + OBJECT_MAPPER, + new FireDepartmentMetrics() + ); + + driver.startJob(); + + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.setHandoffDelay(100); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + driver.publish( + FiniteAppenderatorDriverTest.makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + } + + @Test + public void testFailDuringPush() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + expectedException.expect(TimeoutException.class); + + driver = new FiniteAppenderatorDriver( + createPushFailAppenderator(), + allocator, + segmentHandoffNotifierFactory, + new NoopUsedSegmentChecker(), + OBJECT_MAPPER, + new FireDepartmentMetrics() + ); + + driver.startJob(); + + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.setHandoffDelay(100); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + driver.publish( + FiniteAppenderatorDriverTest.makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + } + + @Test + public void testFailDuringDrop() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); + expectedException.expectMessage( + "Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]" + ); + + driver = new FiniteAppenderatorDriver( + createDropFailAppenderator(), + allocator, + segmentHandoffNotifierFactory, + new NoopUsedSegmentChecker(), + OBJECT_MAPPER, + new FireDepartmentMetrics() + ); + + driver.startJob(); + + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.setHandoffDelay(100); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + final SegmentsAndMetadata published = driver.publish( + FiniteAppenderatorDriverTest.makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + + driver.registerHandoff(published).get(); + } + + private static class NoopUsedSegmentChecker implements UsedSegmentChecker + { + @Override + public Set findUsedSegments(Set identifiers) throws IOException + { + return ImmutableSet.of(); + } + } + + static Appenderator createPushFailAppenderator() + { + return new FailableAppenderator().disablePush(); + } + + static Appenderator createPushInterruptAppenderator() + { + return new FailableAppenderator().interruptPush(); + } + + static Appenderator createPersistFailAppenderator() + { + return new FailableAppenderator().disablePersist(); + } + + static Appenderator createDropFailAppenderator() + { + return new FailableAppenderator().disableDrop(); + } + + private static class FailableAppenderator implements Appenderator + + { + private final Map> rows = new HashMap<>(); + + private boolean dropEnabled = true; + private boolean persistEnabled = true; + private boolean pushEnabled = true; + private boolean interruptPush = false; + + private int numRows; + + public FailableAppenderator disableDrop() + { + dropEnabled = false; + return this; + } + + public FailableAppenderator disablePersist() + { + persistEnabled = false; + return this; + } + + public FailableAppenderator disablePush() + { + pushEnabled = false; + interruptPush = false; + return this; + } + + public FailableAppenderator interruptPush() + { + pushEnabled = false; + interruptPush = true; + return this; + } + + @Override + public String getDataSource() + { + return null; + } + + @Override + public Object startJob() + { + return null; + } + + @Override + public int add( + SegmentIdentifier identifier, InputRow row, Supplier committerSupplier + ) throws IndexSizeExceededException, SegmentNotWritableException + { + rows.computeIfAbsent(identifier, k -> new ArrayList<>()).add(row); + return ++numRows; + } + + @Override + public List getSegments() + { + return ImmutableList.copyOf(rows.keySet()); + } + + @Override + public int getRowCount(SegmentIdentifier identifier) + { + final List rows = this.rows.get(identifier); + if (rows != null) { + return rows.size(); + } else { + return 0; + } + } + + @Override + public void clear() throws InterruptedException + { + rows.clear(); + } + + @Override + public ListenableFuture drop(SegmentIdentifier identifier) + { + if (dropEnabled) { + rows.remove(identifier); + return Futures.immediateFuture(null); + } else { + return Futures.immediateFailedFuture(new ISE("Fail test while dropping segment[%s]", identifier)); + } + } + + @Override + public ListenableFuture persist( + Collection identifiers, Committer committer + ) + { + if (persistEnabled) { + // do nothing + return Futures.immediateFuture(committer.getMetadata()); + } else { + return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", identifiers)); + } + } + + @Override + public ListenableFuture push( + Collection identifiers, Committer committer + ) + { + if (pushEnabled) { + final List segments = identifiers.stream() + .map( + id -> new DataSegment( + id.getDataSource(), + id.getInterval(), + id.getVersion(), + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + id.getShardSpec(), + 0, + 0 + ) + ) + .collect(Collectors.toList()); + return Futures.transform( + persist(identifiers, committer), + (Function) commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata) + ); + } else { + if (interruptPush) { + return new AbstractFuture() + { + @Override + public SegmentsAndMetadata get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException, ExecutionException + { + throw new InterruptedException("Interrupt test while pushing segments"); + } + + @Override + public SegmentsAndMetadata get() throws InterruptedException, ExecutionException + { + throw new InterruptedException("Interrupt test while pushing segments"); + } + }; + } else { + return Futures.immediateFailedFuture(new ISE("Fail test while pushing segments[%s]", identifiers)); + } + } + } + + @Override + public void close() + { + + } + + @Override + public QueryRunner getQueryRunnerForIntervals( + Query query, Iterable intervals + ) + { + throw new UnsupportedOperationException(); + } + + @Override + public QueryRunner getQueryRunnerForSegments( + Query query, Iterable specs + ) + { + throw new UnsupportedOperationException(); + } + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index a7c1052e0695..9357a051d5d8 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; @@ -56,7 +57,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -67,7 +71,8 @@ public class FiniteAppenderatorDriverTest private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); private static final int MAX_ROWS_IN_MEMORY = 100; private static final int MAX_ROWS_PER_SEGMENT = 3; - private static final long HANDOFF_CONDITION_TIMEOUT = 0; + private static final long PUBLISH_TIMEOUT = 1000; + private static final long HANDOFF_CONDITION_TIMEOUT = 1000; private static final List ROWS = Arrays.asList( new MapBasedInputRow( @@ -89,6 +94,7 @@ public class FiniteAppenderatorDriverTest SegmentAllocator allocator; AppenderatorTester appenderatorTester; + TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; FiniteAppenderatorDriver driver; @Before @@ -96,14 +102,13 @@ public void setUp() { appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); driver = new FiniteAppenderatorDriver( appenderatorTester.getAppenderator(), allocator, - new TestSegmentHandoffNotifierFactory(), + segmentHandoffNotifierFactory, new TestUsedSegmentChecker(), OBJECT_MAPPER, - MAX_ROWS_PER_SEGMENT, - HANDOFF_CONDITION_TIMEOUT, new FireDepartmentMetrics() ); } @@ -124,13 +129,16 @@ public void testSimple() throws Exception for (int i = 0; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertNotNull(driver.add(ROWS.get(i), "dummy", committerSupplier)); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); } - final SegmentsAndMetadata segmentsAndMetadata = driver.finish( + final SegmentsAndMetadata published = driver.publish( makeOkPublisher(), - committerSupplier.get() - ); + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals( ImmutableSet.of( @@ -155,21 +163,178 @@ public void testMaxRowsPerSegment() throws Exception InputRow row = new MapBasedInputRow( new DateTime("2000T01"), ImmutableList.of("dim2"), - ImmutableMap.of( + ImmutableMap.of( "dim2", String.format("bar-%d", i), "met1", 2.0 ) ); - Assert.assertNotNull(driver.add(row, "dummy", committerSupplier)); + final AppenderatorDriverAddResult addResult = driver.add(row, "dummy", committerSupplier); + Assert.assertTrue(addResult.isOk()); + if (addResult.getNumRowsInSegment() > MAX_ROWS_PER_SEGMENT) { + driver.moveSegmentOut("dummy", ImmutableList.of(addResult.getSegmentIdentifier())); + } } - final SegmentsAndMetadata segmentsAndMetadata = driver.finish(makeOkPublisher(), committerSupplier.get()); + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size()); Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, segmentsAndMetadata.getCommitMetadata()); } + @Test(timeout = 5000L, expected = TimeoutException.class) + public void testHandoffTimeout() throws Exception + { + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.disableHandoff(); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); + } + + @Test + public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + + Assert.assertNull(driver.startJob()); + + // Add the first row and publish immediately + { + committerSupplier.setMetadata(1); + Assert.assertTrue(driver.add(ROWS.get(0), "dummy", committerSupplier).isOk()); + + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertEquals( + ImmutableSet.of( + new SegmentIdentifier(DATA_SOURCE, new Interval("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)) + ), + asIdentifiers(segmentsAndMetadata.getSegments()) + ); + + Assert.assertEquals(1, segmentsAndMetadata.getCommitMetadata()); + } + + // Add the second and third rows and publish immediately + for (int i = 1; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertEquals( + ImmutableSet.of( + // The second and third rows have the same dataSource, interval, and version, but different shardSpec of + // different partitionNum + new SegmentIdentifier(DATA_SOURCE, new Interval("2000T01/PT1H"), VERSION, new NumberedShardSpec(i - 1, 0)) + ), + asIdentifiers(segmentsAndMetadata.getSegments()) + ); + + Assert.assertEquals(i + 1, segmentsAndMetadata.getCommitMetadata()); + } + + driver.persist(committerSupplier.get()); + + // There is no remaining rows in the driver, and thus the result must be empty + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); + + Assert.assertEquals( + ImmutableSet.of(), + asIdentifiers(segmentsAndMetadata.getSegments()) + ); + + Assert.assertEquals(3, segmentsAndMetadata.getCommitMetadata()); + } + + @Test + public void testIncrementalHandoff() throws Exception + { + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + + Assert.assertNull(driver.startJob()); + + committerSupplier.setMetadata(1); + Assert.assertTrue(driver.add(ROWS.get(0), "sequence_0", committerSupplier).isOk()); + + for (int i = 1; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "sequence_1", committerSupplier).isOk()); + } + + final ListenableFuture futureForSequence0 = driver.publishAndRegisterHandoff( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("sequence_0") + ); + + final ListenableFuture futureForSequence1 = driver.publishAndRegisterHandoff( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("sequence_1") + ); + + final SegmentsAndMetadata handedoffFromSequence0 = futureForSequence0.get( + HANDOFF_CONDITION_TIMEOUT, + TimeUnit.MILLISECONDS + ); + final SegmentsAndMetadata handedoffFromSequence1 = futureForSequence1.get( + HANDOFF_CONDITION_TIMEOUT, + TimeUnit.MILLISECONDS + ); + + Assert.assertEquals( + ImmutableSet.of( + new SegmentIdentifier(DATA_SOURCE, new Interval("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)) + ), + asIdentifiers(handedoffFromSequence0.getSegments()) + ); + + Assert.assertEquals( + ImmutableSet.of( + new SegmentIdentifier(DATA_SOURCE, new Interval("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0)) + ), + asIdentifiers(handedoffFromSequence1.getSegments()) + ); + + Assert.assertEquals(3, handedoffFromSequence0.getCommitMetadata()); + Assert.assertEquals(3, handedoffFromSequence1.getCommitMetadata()); + } + private Set asIdentifiers(Iterable segments) { return ImmutableSet.copyOf( @@ -187,7 +352,7 @@ public SegmentIdentifier apply(DataSegment input) ); } - private TransactionalSegmentPublisher makeOkPublisher() + static TransactionalSegmentPublisher makeOkPublisher() { return new TransactionalSegmentPublisher() { @@ -199,7 +364,7 @@ public boolean publishSegments(Set segments, Object commitMetadata) }; } - private static class TestCommitterSupplier implements Supplier + static class TestCommitterSupplier implements Supplier { private final AtomicReference metadata = new AtomicReference<>(); @@ -229,7 +394,7 @@ public void run() } } - private static class TestSegmentAllocator implements SegmentAllocator + static class TestSegmentAllocator implements SegmentAllocator { private final String dataSource; private final Granularity granularity; @@ -264,8 +429,21 @@ public SegmentIdentifier allocate( } } - private static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory + static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory { + private boolean handoffEnabled = true; + private long handoffDelay; + + public void disableHandoff() + { + handoffEnabled = false; + } + + public void setHandoffDelay(long delay) + { + handoffDelay = delay; + } + @Override public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) { @@ -278,8 +456,19 @@ public boolean registerSegmentHandoffCallback( final Runnable handOffRunnable ) { - // Immediate handoff - exec.execute(handOffRunnable); + if (handoffEnabled) { + + if (handoffDelay > 0) { + try { + Thread.sleep(handoffDelay); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + exec.execute(handOffRunnable); + } return true; }