From aaee65eef9e733b849cd0506195d5138af49e89e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 19 May 2017 03:17:56 +0900 Subject: [PATCH 1/7] Refactoring Appenderator 1) Added publishExecutor and handoffExecutor for background publishing and handing segments off 2) Change add() to not move segments out in it --- .../extensions-core/kafka-ingestion.md | 1 + docs/content/ingestion/tasks.md | 1 + .../druid/indexing/kafka/KafkaIndexTask.java | 96 ++-- .../indexing/kafka/KafkaTuningConfig.java | 13 + .../kafka/supervisor/KafkaSupervisorSpec.java | 1 + .../KafkaSupervisorTuningConfig.java | 2 + .../indexing/kafka/KafkaIndexTaskTest.java | 1 + .../indexing/kafka/KafkaTuningConfigTest.java | 14 +- .../kafka/supervisor/KafkaSupervisorTest.java | 1 + .../druid/indexing/common/task/IndexTask.java | 47 +- .../indexing/common/task/IndexTaskTest.java | 3 +- .../indexing/common/task/TaskSerdeTest.java | 4 +- .../indexing/overlord/TaskLifecycleTest.java | 6 +- .../indexing/RealtimeTuningConfig.java | 1 - .../realtime/appenderator/Appenderator.java | 8 + .../AppenderatorDriverAddResult.java | 66 +++ .../appenderator/AppenderatorImpl.java | 6 + .../FiniteAppenderatorDriver.java | 477 ++++++++++-------- .../FiniteAppenderatorDriverMetadata.java | 10 + .../FiniteAppenderatorDriverTest.java | 71 ++- 20 files changed, 549 insertions(+), 280 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index ca16a5f0f85b..8ca065a36b6d 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -122,6 +122,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| |`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| +|`publishTimeout`|Long|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)| |`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))| diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 4e72e993c162..db8be91e57eb 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -120,6 +120,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |buildV9Directly|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|true|no| |forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no| |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| +|publishTimeout|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|0|no| #### IndexSpec diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index cb42ee57f3b7..712fdeff659e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -65,9 +65,9 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver; -import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.firehose.ChatHandler; @@ -92,11 +92,11 @@ import javax.ws.rs.core.Response; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -434,13 +434,20 @@ public void run() if (!ioConfig.getMinimumMessageTime().isPresent() || !ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) { - final SegmentIdentifier identifier = driver.add( + final String sequenceName = sequenceNames.get(record.partition()); + final AppenderatorDriverAddResult addResult = driver.add( row, - sequenceNames.get(record.partition()), + sequenceName, committerSupplier ); - if (identifier == null) { + if (addResult.isOk()) { + // If the number of rows in the segment exceeds the threshold after adding a row, + // move the segment out from the active segments of AppenderatorDriver to make a new segment. + if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) { + driver.moveSegmentOut(sequenceName, ImmutableList.of(addResult.getSegmentIdentifier())); + } + } else { // Failure to allocate segment puts determinism at risk, bail out to be safe. // May want configurable behavior here at some point. // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. @@ -491,48 +498,66 @@ public void run() status = Status.PUBLISHING; } - final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() - { - @Override - public boolean publishSegments(Set segments, Object commitMetadata) throws IOException - { - final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( - ((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS), - KafkaPartitions.class - ); + final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { + final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( + ((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS), + KafkaPartitions.class + ); - // Sanity check, we should only be publishing things that match our desired end state. - if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) { - throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); - } + // Sanity check, we should only be publishing things that match our desired end state. + if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) { + throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); + } - final SegmentTransactionalInsertAction action; + final SegmentTransactionalInsertAction action; - if (ioConfig.isUseTransaction()) { - action = new SegmentTransactionalInsertAction( - segments, - new KafkaDataSourceMetadata(ioConfig.getStartPartitions()), - new KafkaDataSourceMetadata(finalPartitions) - ); - } else { - action = new SegmentTransactionalInsertAction(segments, null, null); - } + if (ioConfig.isUseTransaction()) { + action = new SegmentTransactionalInsertAction( + segments, + new KafkaDataSourceMetadata(ioConfig.getStartPartitions()), + new KafkaDataSourceMetadata(finalPartitions) + ); + } else { + action = new SegmentTransactionalInsertAction(segments, null, null); + } - log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); + log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); - return toolbox.getTaskActionClient().submit(action).isSuccess(); - } + return toolbox.getTaskActionClient().submit(action).isSuccess(); }; - final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get()); - if (published == null) { + final SegmentsAndMetadata published; + if (tuningConfig.getPublishTimeout() == 0) { + published = driver.publish( + publisher, + committerSupplier.get(), + sequenceNames.values() + ).get(); + } else { + published = driver.publish( + publisher, + committerSupplier.get(), + sequenceNames.values() + ).get(tuningConfig.getPublishTimeout(), TimeUnit.MILLISECONDS); + } + + final SegmentsAndMetadata handedOff; + if (tuningConfig.getHandoffConditionTimeout() == 0) { + handedOff = driver.registerHandoff(published) + .get(); + } else { + handedOff = driver.registerHandoff(published) + .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); + } + + if (handedOff == null) { throw new ISE("Transaction failure publishing segments, aborting"); } else { log.info( "Published segments[%s] with metadata[%s].", Joiner.on(", ").join( Iterables.transform( - published.getSegments(), + handedOff.getSegments(), new Function() { @Override @@ -543,7 +568,7 @@ public String apply(DataSegment input) } ) ), - published.getCommitMetadata() + handedOff.getCommitMetadata() ); } } @@ -865,7 +890,6 @@ private FiniteAppenderatorDriver newDriver( toolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getObjectMapper(), - tuningConfig.getMaxRowsPerSegment(), tuningConfig.getHandoffConditionTimeout(), metrics ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index fa721f2bd7aa..0e2d4b2fd439 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -33,6 +33,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig { private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; + private static final long DEFAULT_PUBLISH_TIMEOUT = 0; private final int maxRowsInMemory; private final int maxRowsPerSegment; @@ -42,6 +43,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private final IndexSpec indexSpec; private final boolean buildV9Directly; private final boolean reportParseExceptions; + private final long publishTimeout; private final long handoffConditionTimeout; private final boolean resetOffsetAutomatically; @@ -55,6 +57,7 @@ public KafkaTuningConfig( @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, + @JsonProperty("publishTimeout") Long publishTimeout, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically ) @@ -74,6 +77,7 @@ public KafkaTuningConfig( this.reportParseExceptions = reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions; + this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; this.handoffConditionTimeout = handoffConditionTimeout == null ? defaults.getHandoffConditionTimeout() : handoffConditionTimeout; @@ -93,6 +97,7 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config) config.indexSpec, config.buildV9Directly, config.reportParseExceptions, + config.publishTimeout, config.handoffConditionTimeout, config.resetOffsetAutomatically ); @@ -152,6 +157,12 @@ public boolean isReportParseExceptions() return reportParseExceptions; } + @JsonProperty + public long getPublishTimeout() + { + return publishTimeout; + } + @JsonProperty public long getHandoffConditionTimeout() { @@ -175,6 +186,7 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) indexSpec, buildV9Directly, reportParseExceptions, + publishTimeout, handoffConditionTimeout, resetOffsetAutomatically ); @@ -191,6 +203,7 @@ public KafkaTuningConfig withMaxRowsInMemory(int rows) indexSpec, buildV9Directly, reportParseExceptions, + publishTimeout, handoffConditionTimeout, resetOffsetAutomatically ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 2bcd5c70ccf6..c476b05e1053 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -87,6 +87,7 @@ public KafkaSupervisorSpec( null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 0e28d4abcb23..0dd20ef02fce 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -44,6 +44,7 @@ public KafkaSupervisorTuningConfig( @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, + @JsonProperty("publishTimeout") Long publishTimeout, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, @JsonProperty("workerThreads") Integer workerThreads, @@ -62,6 +63,7 @@ public KafkaSupervisorTuningConfig( indexSpec, buildV9Directly, reportParseExceptions, + publishTimeout, handoffConditionTimeout, resetOffsetAutomatically ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 52f751757fd1..96ac70c86580 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1381,6 +1381,7 @@ private KafkaIndexTask createTask( null, buildV9Directly, reportParseExceptions, + null, handoffConditionTimeout, resetOffsetAutomatically ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java index 70d2766ef81e..1d5d453db501 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -104,7 +104,19 @@ public void testSerdeWithNonDefaults() throws Exception @Test public void testCopyOf() throws Exception { - KafkaTuningConfig original = new KafkaTuningConfig(1, 2, new Period("PT3S"), new File("/tmp/xxx"), 4, new IndexSpec(), true, true, 5L, null); + KafkaTuningConfig original = new KafkaTuningConfig( + 1, + 2, + new Period("PT3S"), + new File("/tmp/xxx"), + 4, + new IndexSpec(), + true, + true, + null, + 5L, + null + ); KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original); Assert.assertEquals(1, copy.getMaxRowsInMemory()); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 17507339da38..c811b63f8053 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -178,6 +178,7 @@ public void setUp() throws Exception false, null, null, + null, numThreads, TEST_CHAT_THREADS, TEST_CHAT_RETRIES, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index a39a2238c79b..6a01102d38d2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -30,6 +30,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; @@ -74,6 +75,7 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorConfig; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentAllocator; @@ -100,6 +102,9 @@ import java.util.Map; import java.util.Set; import java.util.SortedSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class IndexTask extends AbstractTask { @@ -451,9 +456,9 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin sequenceNameToShardSpecMap.put(sequenceName, shardSpecForPublishing); } - final SegmentIdentifier identifier = driver.add(inputRow, sequenceName, committerSupplier); + final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier); - if (identifier == null) { + if (!addResult.isOk()) { throw new ISE("Could not allocate segment for row with timestamp[%s]", inputRow.getTimestamp()); } @@ -482,7 +487,22 @@ public boolean publishSegments(Set segments, Object commitMetadata) } }; - final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get()); + final SegmentsAndMetadata published; + final long publishTimeout = ingestionSchema.getTuningConfig().getPublishTimeout(); + if (publishTimeout == 0) { + published = driver.publish( + publisher, + committerSupplier.get(), + sequenceNameToShardSpecMap.keySet() + ).get(); + } else { + published = driver.publish( + publisher, + committerSupplier.get(), + sequenceNameToShardSpecMap.keySet() + ).get(ingestionSchema.getTuningConfig().getPublishTimeout(), TimeUnit.MILLISECONDS); + } + if (published == null) { log.error("Failed to publish segments, aborting!"); return false; @@ -505,6 +525,9 @@ public String apply(DataSegment input) return true; } } + catch (TimeoutException | ExecutionException e) { + throw Throwables.propagate(e); + } } private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox, DataSchema dataSchema) @@ -533,7 +556,6 @@ private FiniteAppenderatorDriver newDriver( new NoopSegmentHandoffNotifierFactory(), // don't wait for handoff since we don't serve queries new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getObjectMapper(), - Integer.MAX_VALUE, // rows for a partition is already determined by the shardSpec 0, metrics ); @@ -558,7 +580,7 @@ public IndexIngestionSpec( this.ioConfig = ioConfig; this.tuningConfig = tuningConfig == null ? - new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null) + new IndexTuningConfig(null, null, null, null, null, null, null, null, null, (File) null) : tuningConfig; } @@ -624,6 +646,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private static final boolean DEFAULT_BUILD_V9_DIRECTLY = true; private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false; private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; + private static final long DEFAULT_PUBLISH_TIMEOUT = 0; static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; @@ -636,6 +659,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final boolean buildV9Directly; private final boolean forceExtendableShardSpecs; private final boolean reportParseExceptions; + private final long publishTimeout; @JsonCreator public IndexTuningConfig( @@ -647,7 +671,8 @@ public IndexTuningConfig( @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs, - @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions + @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, + @JsonProperty("publishTimeout") @Nullable Long publishTimeout ) { this( @@ -659,6 +684,7 @@ public IndexTuningConfig( buildV9Directly, forceExtendableShardSpecs, reportParseExceptions, + publishTimeout, null ); } @@ -672,6 +698,7 @@ private IndexTuningConfig( @Nullable Boolean buildV9Directly, @Nullable Boolean forceExtendableShardSpecs, @Nullable Boolean reportParseExceptions, + @Nullable Long publishTimeout, @Nullable File basePersistDirectory ) { @@ -696,6 +723,7 @@ private IndexTuningConfig( this.reportParseExceptions = reportParseExceptions == null ? DEFAULT_REPORT_PARSE_EXCEPTIONS : reportParseExceptions; + this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; this.basePersistDirectory = basePersistDirectory; } @@ -710,6 +738,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir) buildV9Directly, forceExtendableShardSpecs, reportParseExceptions, + publishTimeout, dir ); } @@ -772,6 +801,12 @@ public boolean isForceExtendableShardSpecs() return forceExtendableShardSpecs; } + @JsonProperty + public long getPublishTimeout() + { + return publishTimeout; + } + @Override public Period getIntermediatePersistPeriod() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 94b3459f3921..282a6740cc10 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -606,7 +606,8 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( null, true, forceExtendableShardSpecs, - true + true, + null ) ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 515b3913f2f5..b74780fc38a3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -186,7 +186,7 @@ public void testIndexTaskSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true) + new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true, null) ), null, jsonMapper @@ -250,7 +250,7 @@ public void testIndexTaskwithResourceSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) ), null, jsonMapper diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 6fe8e11d95bf..39064903942d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -649,7 +649,7 @@ public void testIndexTask() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) ), null, MAPPER @@ -707,7 +707,7 @@ public void testIndexTaskFailure() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) ), null, MAPPER @@ -1066,7 +1066,7 @@ public void testResumeTasks() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null, null) ), null, MAPPER diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index b95139bc9b03..99b639da3685 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -133,7 +133,6 @@ public RealtimeTuningConfig( this.reportParseExceptions = reportParseExceptions == null ? defaultReportParseExceptions : reportParseExceptions; - this.handoffConditionTimeout = handoffConditionTimeout == null ? defaultHandoffConditionTimeout : handoffConditionTimeout; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index f869ab6894c7..fdb3bd137a77 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -80,6 +80,14 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe */ List getSegments(); + /** + * Indicates that the given segment exists. + * + * @param identifier segment to examine + * @return true if exists. + */ + boolean containsSegment(SegmentIdentifier identifier); + /** * Returns the number of rows in a particular pending segment. * diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java new file mode 100644 index 000000000000..6d0ee1b5e581 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java @@ -0,0 +1,66 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.base.Supplier; +import io.druid.data.input.InputRow; + +import javax.annotation.Nullable; + +/** + * Result of {@link FiniteAppenderatorDriver#add(InputRow, String, Supplier)}. It contains the identifier of the + * segment which the InputRow is added to and the number of rows in that segment. + */ +public class AppenderatorDriverAddResult +{ + private final SegmentIdentifier segmentIdentifier; + private final int numRowsInSegment; + + public static AppenderatorDriverAddResult ok(SegmentIdentifier segmentIdentifier, int numRowsInSegment) + { + return new AppenderatorDriverAddResult(segmentIdentifier, numRowsInSegment); + } + + public static AppenderatorDriverAddResult fail() + { + return new AppenderatorDriverAddResult(null, 0); + } + + private AppenderatorDriverAddResult(@Nullable SegmentIdentifier segmentIdentifier, int numRowsInSegment) + { + this.segmentIdentifier = segmentIdentifier; + this.numRowsInSegment = numRowsInSegment; + } + + public boolean isOk() + { + return segmentIdentifier != null; + } + + public SegmentIdentifier getSegmentIdentifier() + { + return segmentIdentifier; + } + + public int getNumRowsInSegment() + { + return numRowsInSegment; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 1c3e2794b94d..db2e9769e86f 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -230,6 +230,12 @@ public List getSegments() return ImmutableList.copyOf(sinks.keySet()); } + @Override + public boolean containsSegment(SegmentIdentifier identifier) + { + return sinks.containsKey(identifier); + } + @Override public int getRowCount(final SegmentIdentifier identifier) { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java index 8119f81f06ae..58ba077a666e 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -34,8 +33,9 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; - +import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; @@ -47,14 +47,20 @@ import io.druid.timeline.DataSegment; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; /** * A FiniteAppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you @@ -76,21 +82,27 @@ public class FiniteAppenderatorDriver implements Closeable private final SegmentHandoffNotifier handoffNotifier; private final UsedSegmentChecker usedSegmentChecker; private final ObjectMapper objectMapper; - private final int maxRowsPerSegment; private final long handoffConditionTimeout; private final FireDepartmentMetrics metrics; - // All access to "activeSegments" and "lastSegmentId" must be synchronized on "activeSegments". + // All access to "activeSegments", "publishPendingSegments", and "lastSegmentId" must be synchronized on + // "activeSegments". // sequenceName -> start of segment interval -> segment we're currently adding data to private final Map> activeSegments = new TreeMap<>(); + // sequenceName -> list of segmentIdentifiers + private final Map> publishPendingSegments = new HashMap<>(); + // sequenceName -> most recently allocated segment private final Map lastSegmentIds = Maps.newHashMap(); // Notified when segments are dropped. private final Object handoffMonitor = new Object(); + private final ListeningExecutorService publishExecutor; + private final ListeningExecutorService handoffExecutor; + /** * Create a driver. * @@ -99,9 +111,6 @@ public class FiniteAppenderatorDriver implements Closeable * @param handoffNotifierFactory handoff notifier factory * @param usedSegmentChecker used segment checker * @param objectMapper object mapper, used for serde of commit metadata - * @param maxRowsPerSegment maximum number of rows allowed in an entire segment (not a single persist) - * @param handoffConditionTimeout maximum number of millis allowed for handoff (not counting push/publish), zero - * means wait forever. * @param metrics Firedepartment metrics */ public FiniteAppenderatorDriver( @@ -110,7 +119,6 @@ public FiniteAppenderatorDriver( SegmentHandoffNotifierFactory handoffNotifierFactory, UsedSegmentChecker usedSegmentChecker, ObjectMapper objectMapper, - int maxRowsPerSegment, long handoffConditionTimeout, FireDepartmentMetrics metrics ) @@ -121,9 +129,10 @@ public FiniteAppenderatorDriver( .createSegmentHandoffNotifier(appenderator.getDataSource()); this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); - this.maxRowsPerSegment = maxRowsPerSegment; this.handoffConditionTimeout = handoffConditionTimeout; this.metrics = Preconditions.checkNotNull(metrics, "metrics"); + this.publishExecutor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d")); + this.handoffExecutor = MoreExecutors.listeningDecorator(Execs.singleThreaded("handoff-%d")); } /** @@ -151,13 +160,14 @@ public Object startJob() final String sequenceName = entry.getKey(); final TreeMap segmentMap = Maps.newTreeMap(); - lastSegmentIds.put(sequenceName, metadata.getLastSegmentIds().get(sequenceName)); activeSegments.put(sequenceName, segmentMap); for (SegmentIdentifier identifier : entry.getValue()) { segmentMap.put(identifier.getInterval().getStartMillis(), identifier); } } + publishPendingSegments.putAll(metadata.getPublishPendingSegments()); + lastSegmentIds.putAll(metadata.getLastSegmentIds()); } return metadata.getCallerMetadata(); @@ -166,6 +176,18 @@ public Object startJob() } } + private void addSegment(String sequenceName, SegmentIdentifier identifier) + { + synchronized (activeSegments) { + activeSegments.computeIfAbsent(sequenceName, k -> new TreeMap<>()) + .putIfAbsent(identifier.getInterval().getStartMillis(), identifier); + + publishPendingSegments.computeIfAbsent(sequenceName, k -> new ArrayList<>()) + .add(identifier); + lastSegmentIds.put(sequenceName, identifier.getIdentifierAsString()); + } + } + /** * Clears out all our state and also calls {@link Appenderator#clear()} on the underlying Appenderator. */ @@ -188,7 +210,7 @@ public void clear() throws InterruptedException * * @throws IOException if there is an I/O error while allocating or writing to a segment */ - public SegmentIdentifier add( + public AppenderatorDriverAddResult add( final InputRow row, final String sequenceName, final Supplier committerSupplier @@ -203,16 +225,14 @@ public SegmentIdentifier add( if (identifier != null) { try { final int numRows = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier)); - if (numRows >= maxRowsPerSegment) { - moveSegmentOut(sequenceName, ImmutableList.of(identifier)); - } + return AppenderatorDriverAddResult.ok(identifier, numRows); } catch (SegmentNotWritableException e) { throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier); } + } else { + return AppenderatorDriverAddResult.fail(); } - - return identifier; } /** @@ -242,59 +262,93 @@ public Object persist(final Committer committer) throws InterruptedException } /** - * Publish all data indexed through this driver so far, and waits for it to be handed off. Blocks until complete. - * Retries forever on transient failures, but may exit early on permanent failures. - * - * Should be called after all data has been added and persisted through {@link #add(InputRow, String, Supplier)} and - * {@link #persist(Committer)}. + * Register the segments in the given {@link SegmentsAndMetadata} to be handed off and execute a background task which + * waits until the hand off completes. * - * @param publisher publisher to use for this set of segments - * @param committer committer representing all data that has been added so far - * - * @return segments and metadata published if successful, or null if segments could not be handed off due to - * transaction failure with commit metadata. + * @param segmentsAndMetadata + * @return + * @throws InterruptedException */ - public SegmentsAndMetadata finish( - final TransactionalSegmentPublisher publisher, - final Committer committer - ) throws InterruptedException + public ListenableFuture registerHandoff(SegmentsAndMetadata segmentsAndMetadata) + throws InterruptedException { - final SegmentsAndMetadata segmentsAndMetadata = publishAll(publisher, wrapCommitter(committer)); - - if (segmentsAndMetadata != null) { - final long giveUpAt = handoffConditionTimeout > 0 - ? System.currentTimeMillis() + handoffConditionTimeout - : 0; + if (segmentsAndMetadata == null) { + return Futures.immediateFuture(null); - log.info("Awaiting handoff of segments: [%s]", Joiner.on(", ").join(appenderator.getSegments())); + } else { + final Object waitForHandoffMonitorStart = new Object(); - synchronized (handoffMonitor) { - while (!appenderator.getSegments().isEmpty()) { + final ListenableFuture future = handoffExecutor.submit( + () -> { + final List waitingSegmentIdList = segmentsAndMetadata.getSegments().stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toList()); + log.info("Awaiting handoff of segments: [%s]", waitingSegmentIdList); - if (giveUpAt == 0) { - handoffMonitor.wait(); - } else { - final long remaining = giveUpAt - System.currentTimeMillis(); - if (remaining > 0) { - handoffMonitor.wait(remaining); - } else { - throw new ISE( - "Segment handoff wait timeout. Segments not yet handed off: [%s]", - Joiner.on(", ").join(appenderator.getSegments()) - ); + synchronized (waitForHandoffMonitorStart) { + waitForHandoffMonitorStart.notify(); + } + synchronized (handoffMonitor) { + while (waitingSegmentIdList.stream().anyMatch(appenderator::containsSegment)) { + handoffMonitor.wait(); + } } + + log.info("All segments handed off."); + + return new SegmentsAndMetadata( + segmentsAndMetadata.getSegments(), + ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() + ); } - } + ); + + // wait for handoffMonitor starts + log.info("Waiting handoff monitor starts"); + synchronized (waitForHandoffMonitorStart) { + waitForHandoffMonitorStart.wait(handoffConditionTimeout); } - log.info("All segments handed off."); + for (final DataSegment dataSegment : segmentsAndMetadata.getSegments()) { + handoffNotifier.registerSegmentHandoffCallback( + new SegmentDescriptor( + dataSegment.getInterval(), + dataSegment.getVersion(), + dataSegment.getShardSpec().getPartitionNum() + ), + MoreExecutors.sameThreadExecutor(), + () -> { + final SegmentIdentifier identifier = SegmentIdentifier.fromDataSegment(dataSegment); + log.info("Segment[%s] successfully handed off, dropping.", identifier); + metrics.incrementHandOffCount(); + final ListenableFuture dropFuture = appenderator.drop(identifier); + Futures.addCallback( + dropFuture, + new FutureCallback() + { + @Override + public void onSuccess(Object result) + { + synchronized (handoffMonitor) { + handoffMonitor.notifyAll(); + } + } - return new SegmentsAndMetadata( - segmentsAndMetadata.getSegments(), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() - ); - } else { - return null; + @Override + public void onFailure(Throwable e) + { + log.warn(e, "Failed to drop segment[%s]?!", identifier); + synchronized (handoffMonitor) { + handoffMonitor.notifyAll(); + } + } + } + ); + } + ); + } + + return future; } } @@ -304,6 +358,8 @@ public SegmentsAndMetadata finish( @Override public void close() { + publishExecutor.shutdownNow(); + handoffExecutor.shutdownNow(); handoffNotifier.close(); } @@ -343,7 +399,6 @@ private SegmentIdentifier getSegment(final DateTime timestamp, final String sequ return existing; } else { // Allocate new segment. - final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); final SegmentIdentifier newSegment = segmentAllocator.allocate( timestamp, sequenceName, @@ -351,8 +406,6 @@ private SegmentIdentifier getSegment(final DateTime timestamp, final String sequ ); if (newSegment != null) { - final Long key = newSegment.getInterval().getStartMillis(); - for (SegmentIdentifier identifier : appenderator.getSegments()) { if (identifier.equals(newSegment)) { throw new ISE( @@ -364,11 +417,7 @@ private SegmentIdentifier getSegment(final DateTime timestamp, final String sequ } log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName); - if (activeSegmentsForSequence == null) { - activeSegments.put(sequenceName, Maps.newTreeMap()); - } - activeSegments.get(sequenceName).put(key, newSegment); - lastSegmentIds.put(sequenceName, newSegment.getIdentifierAsString()); + addSegment(sequenceName, newSegment); } else { // Well, we tried. log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName); @@ -382,7 +431,7 @@ private SegmentIdentifier getSegment(final DateTime timestamp, final String sequ /** * Move a set of identifiers out from "active", making way for newer segments. */ - private void moveSegmentOut(final String sequenceName, final List identifiers) + public void moveSegmentOut(final String sequenceName, final List identifiers) { synchronized (activeSegments) { final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); @@ -393,7 +442,7 @@ private void moveSegmentOut(final String sequenceName, final List publish( final TransactionalSegmentPublisher publisher, - final Committer wrappedCommitter + final Committer committer, + final Collection sequenceNames ) throws InterruptedException { - final List theSegments = ImmutableList.copyOf(appenderator.getSegments()); - - long nTry = 0; - while (true) { - try { - log.info("Pushing segments: [%s]", Joiner.on(", ").join(theSegments)); - final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(theSegments, wrappedCommitter).get(); - - // Sanity check - if (!segmentsToIdentifiers(segmentsAndMetadata.getSegments()).equals(Sets.newHashSet(theSegments))) { - throw new ISE( - "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", - Joiner.on(", ").join(identifiersToStrings(segmentsToIdentifiers(segmentsAndMetadata.getSegments()))), - Joiner.on(", ").join(identifiersToStrings(theSegments)) - ); - } + final List theSegments; + synchronized (activeSegments) { + theSegments = sequenceNames.stream() + .map(publishPendingSegments::get) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } - log.info( - "Publishing segments with commitMetadata[%s]: [%s]", - segmentsAndMetadata.getCommitMetadata(), - Joiner.on(", ").join(segmentsAndMetadata.getSegments()) - ); + final ListenableFuture publishFuture = publish( + publisher, + wrapCommitter(committer), + theSegments + ); - if (segmentsAndMetadata.getSegments().isEmpty()) { - log.info("Nothing to publish, skipping publish step."); - } else { - final boolean published = publisher.publishSegments( - ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() - ); - - if (published) { - log.info("Published segments, awaiting handoff."); - } else { - log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); - if (usedSegmentChecker.findUsedSegments(segmentsToIdentifiers(segmentsAndMetadata.getSegments())) - .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { - log.info("Our segments really do exist, awaiting handoff."); - } else { - log.warn("Our segments don't exist, giving up."); - return null; + Futures.addCallback( + publishFuture, + new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata result) + { + if (result != null) { + synchronized (activeSegments) { + sequenceNames.forEach(publishPendingSegments::remove); + } } } + + @Override + public void onFailure(Throwable t) + { + log.error(t, "Failed to publish segments[%s]", theSegments); + } } + ); - for (final DataSegment dataSegment : segmentsAndMetadata.getSegments()) { - handoffNotifier.registerSegmentHandoffCallback( - new SegmentDescriptor( - dataSegment.getInterval(), - dataSegment.getVersion(), - dataSegment.getShardSpec().getPartitionNum() - ), - MoreExecutors.sameThreadExecutor(), - new Runnable() - { - @Override - public void run() - { - final SegmentIdentifier identifier = SegmentIdentifier.fromDataSegment(dataSegment); - log.info("Segment[%s] successfully handed off, dropping.", identifier); - metrics.incrementHandOffCount(); - final ListenableFuture dropFuture = appenderator.drop(identifier); - Futures.addCallback( - dropFuture, - new FutureCallback() - { - @Override - public void onSuccess(Object result) - { - synchronized (handoffMonitor) { - handoffMonitor.notifyAll(); - } - } - - @Override - public void onFailure(Throwable e) - { - log.warn(e, "Failed to drop segment[%s]?!"); - synchronized (handoffMonitor) { - handoffMonitor.notifyAll(); - } - } - } - ); + return publishFuture; + } + + /** + * Execute a task in background to publish the given segments. The task blocks until complete. + * Retries forever on transient failures, but may exit early on permanent failures. + * + * Should be called after all data has been added and persisted through {@link #add(InputRow, String, Supplier)} and + * {@link #persist(Committer)}. + * + * @param publisher publisher to use for this set of segments + * @param wrappedCommitter committer representing all data that has been added so far + * + * @return segments and metadata published if successful, or null if segments could not be handed off due to + * transaction failure with commit metadata. + */ + private ListenableFuture publish( + final TransactionalSegmentPublisher publisher, + final WrappedCommitter wrappedCommitter, + final List segmentIdentifiers + ) throws InterruptedException + { + return publishExecutor.submit( + () -> { + long nTry = 0; + while (true) { + try { + log.info("Pushing segments: [%s]", Joiner.on(", ").join(segmentIdentifiers)); + final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(segmentIdentifiers, wrappedCommitter) + .get(); + + // Sanity check + final Set pushedSegments = segmentsAndMetadata.getSegments().stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toSet()); + if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) { + throw new ISE( + "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", + pushedSegments, + segmentIdentifiers + ); + } + + log.info( + "Publishing segments with commitMetadata[%s]: [%s]", + segmentsAndMetadata.getCommitMetadata(), + Joiner.on(", ").join(segmentsAndMetadata.getSegments()) + ); + + if (segmentsAndMetadata.getSegments().isEmpty()) { + log.info("Nothing to publish, skipping publish step."); + } else { + final boolean published = publisher.publishSegments( + ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), + ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() + ); + + if (published) { + log.info("Published segments, awaiting handoff."); + } else { + log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); + if (usedSegmentChecker.findUsedSegments(pushedSegments) + .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { + log.info("Our segments really do exist, awaiting handoff."); + } else { + log.warn("Our segments don't exist, giving up."); + return null; + } } } - ); + + return segmentsAndMetadata; + } + catch (InterruptedException e) { + throw e; + } + catch (Exception e) { + final long sleepMillis = computeNextRetrySleep(++nTry); + log.warn(e, "Failed publishAll (try %d), retrying in %,dms.", nTry, sleepMillis); + Thread.sleep(sleepMillis); + } + } } + ); + } - return segmentsAndMetadata; - } - catch (InterruptedException e) { - throw e; - } - catch (Exception e) { - final long sleepMillis = computeNextRetrySleep(++nTry); - log.warn(e, "Failed publishAll (try %d), retrying in %,dms.", nTry, sleepMillis); - Thread.sleep(sleepMillis); - } - } + private interface WrappedCommitter extends Committer + { } private Supplier wrapCommitterSupplier(final Supplier committerSupplier) { - return new Supplier() - { - @Override - public Committer get() - { - return wrapCommitter(committerSupplier.get()); - } - }; + return () -> wrapCommitter(committerSupplier.get()); } - private Committer wrapCommitter(final Committer committer) + private WrappedCommitter wrapCommitter(final Committer committer) { + final FiniteAppenderatorDriverMetadata wrappedMetadata; synchronized (activeSegments) { - final FiniteAppenderatorDriverMetadata wrappedMetadata = new FiniteAppenderatorDriverMetadata( + wrappedMetadata = new FiniteAppenderatorDriverMetadata( ImmutableMap.copyOf( Maps.transformValues( activeSegments, @@ -545,25 +615,26 @@ public List apply(NavigableMap input } ) ), + ImmutableMap.copyOf(publishPendingSegments), ImmutableMap.copyOf(lastSegmentIds), committer.getMetadata() ); + } - return new Committer() + return new WrappedCommitter() + { + @Override + public Object getMetadata() { - @Override - public Object getMetadata() - { - return wrappedMetadata; - } + return wrappedMetadata; + } - @Override - public void run() - { - committer.run(); - } - }; - } + @Override + public void run() + { + committer.run(); + } + }; } private static long computeNextRetrySleep(final long nTry) @@ -573,34 +644,4 @@ private static long computeNextRetrySleep(final long nTry) final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * new Random().nextGaussian(), 0), 2); return (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry)) * fuzzyMultiplier); } - - private static Set segmentsToIdentifiers(final Iterable segments) - { - return FluentIterable.from(segments) - .transform( - new Function() - { - @Override - public SegmentIdentifier apply(DataSegment segment) - { - return SegmentIdentifier.fromDataSegment(segment); - } - } - ).toSet(); - } - - private static Iterable identifiersToStrings(final Iterable identifiers) - { - return FluentIterable.from(identifiers) - .transform( - new Function() - { - @Override - public String apply(SegmentIdentifier input) - { - return input.getIdentifierAsString(); - } - } - ); - } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java index 65c7e70c7888..f38229da45e3 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java @@ -28,17 +28,20 @@ public class FiniteAppenderatorDriverMetadata { private final Map> activeSegments; + private final Map> publishPendingSegments; private final Map lastSegmentIds; private final Object callerMetadata; @JsonCreator public FiniteAppenderatorDriverMetadata( @JsonProperty("activeSegments") Map> activeSegments, + @JsonProperty("publishPendingSegments") Map> publishPendingSegments, @JsonProperty("lastSegmentIds") Map lastSegmentIds, @JsonProperty("callerMetadata") Object callerMetadata ) { this.activeSegments = activeSegments; + this.publishPendingSegments = publishPendingSegments; this.lastSegmentIds = lastSegmentIds; this.callerMetadata = callerMetadata; } @@ -49,6 +52,12 @@ public Map> getActiveSegments() return activeSegments; } + @JsonProperty + public Map> getPublishPendingSegments() + { + return publishPendingSegments; + } + @JsonProperty public Map getLastSegmentIds() { @@ -66,6 +75,7 @@ public String toString() { return "FiniteAppenderatorDriverMetadata{" + "activeSegments=" + activeSegments + + ", publishPendingSegments=" + publishPendingSegments + ", lastSegmentIds=" + lastSegmentIds + ", callerMetadata=" + callerMetadata + '}'; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index a7c1052e0695..47be734651f6 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -57,6 +57,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -67,7 +69,8 @@ public class FiniteAppenderatorDriverTest private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); private static final int MAX_ROWS_IN_MEMORY = 100; private static final int MAX_ROWS_PER_SEGMENT = 3; - private static final long HANDOFF_CONDITION_TIMEOUT = 0; + private static final long PUBLISH_TIMEOUT = 1000; + private static final long HANDOFF_CONDITION_TIMEOUT = 1000; private static final List ROWS = Arrays.asList( new MapBasedInputRow( @@ -89,6 +92,7 @@ public class FiniteAppenderatorDriverTest SegmentAllocator allocator; AppenderatorTester appenderatorTester; + TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; FiniteAppenderatorDriver driver; @Before @@ -96,13 +100,13 @@ public void setUp() { appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); driver = new FiniteAppenderatorDriver( appenderatorTester.getAppenderator(), allocator, - new TestSegmentHandoffNotifierFactory(), + segmentHandoffNotifierFactory, new TestUsedSegmentChecker(), OBJECT_MAPPER, - MAX_ROWS_PER_SEGMENT, HANDOFF_CONDITION_TIMEOUT, new FireDepartmentMetrics() ); @@ -124,13 +128,16 @@ public void testSimple() throws Exception for (int i = 0; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertNotNull(driver.add(ROWS.get(i), "dummy", committerSupplier)); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); } - final SegmentsAndMetadata segmentsAndMetadata = driver.finish( + final SegmentsAndMetadata published = driver.publish( makeOkPublisher(), - committerSupplier.get() - ); + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals( ImmutableSet.of( @@ -155,21 +162,52 @@ public void testMaxRowsPerSegment() throws Exception InputRow row = new MapBasedInputRow( new DateTime("2000T01"), ImmutableList.of("dim2"), - ImmutableMap.of( + ImmutableMap.of( "dim2", String.format("bar-%d", i), "met1", 2.0 ) ); - Assert.assertNotNull(driver.add(row, "dummy", committerSupplier)); + final AppenderatorDriverAddResult addResult = driver.add(row, "dummy", committerSupplier); + Assert.assertTrue(addResult.isOk()); + if (addResult.getNumRowsInSegment() > MAX_ROWS_PER_SEGMENT) { + driver.moveSegmentOut("dummy", ImmutableList.of(addResult.getSegmentIdentifier())); + } } - final SegmentsAndMetadata segmentsAndMetadata = driver.finish(makeOkPublisher(), committerSupplier.get()); + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size()); Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, segmentsAndMetadata.getCommitMetadata()); } + @Test(timeout = 5000L, expected = TimeoutException.class) + public void testHandoffTimeout() throws Exception + { + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.disableHandoff(); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); + } + private Set asIdentifiers(Iterable segments) { return ImmutableSet.copyOf( @@ -266,6 +304,13 @@ public SegmentIdentifier allocate( private static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory { + private boolean handoffEnabled = true; + + public void disableHandoff() + { + handoffEnabled = false; + } + @Override public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) { @@ -278,8 +323,10 @@ public boolean registerSegmentHandoffCallback( final Runnable handOffRunnable ) { - // Immediate handoff - exec.execute(handOffRunnable); + if (handoffEnabled) { + // Immediate handoff + exec.execute(handOffRunnable); + } return true; } From fa45bc421b6d11a0763a3e037ac77a8987a5ecf9 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 25 May 2017 08:16:55 +0900 Subject: [PATCH 2/7] Address comments 1) Remove publishTimeout for KafkaIndexTask 2) Simplifying registerHandoff() 3) Add increamental handoff test --- .../extensions-core/kafka-ingestion.md | 3 +- .../druid/indexing/kafka/KafkaIndexTask.java | 21 ++-- .../indexing/kafka/KafkaTuningConfig.java | 15 +-- .../kafka/supervisor/KafkaSupervisorSpec.java | 1 - .../KafkaSupervisorTuningConfig.java | 6 +- .../indexing/kafka/KafkaIndexTaskTest.java | 1 - .../indexing/kafka/KafkaTuningConfigTest.java | 1 - .../kafka/supervisor/KafkaSupervisorTest.java | 1 - .../realtime/appenderator/Appenderator.java | 8 -- .../appenderator/AppenderatorImpl.java | 6 - .../FiniteAppenderatorDriver.java | 108 +++++++++--------- .../FiniteAppenderatorDriverTest.java | 73 ++++++++++++ 12 files changed, 139 insertions(+), 105 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 8ca065a36b6d..0ce192c11b8e 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -122,8 +122,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| |`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| -|`publishTimeout`|Long|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|no (default == 0)| -|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| +|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. This option is deprecated. Use `completionTimeout` of KafkaSupervisorIOConfig instead.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)| |`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))| |`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))| diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 712fdeff659e..474bc5a321f4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -526,20 +526,13 @@ public void run() return toolbox.getTaskActionClient().submit(action).isSuccess(); }; - final SegmentsAndMetadata published; - if (tuningConfig.getPublishTimeout() == 0) { - published = driver.publish( - publisher, - committerSupplier.get(), - sequenceNames.values() - ).get(); - } else { - published = driver.publish( - publisher, - committerSupplier.get(), - sequenceNames.values() - ).get(tuningConfig.getPublishTimeout(), TimeUnit.MILLISECONDS); - } + // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting + // for hand off. See KafkaSupervisorIOConfig.completionTimeout. + final SegmentsAndMetadata published = driver.publish( + publisher, + committerSupplier.get(), + sequenceNames.values() + ).get(); final SegmentsAndMetadata handedOff; if (tuningConfig.getHandoffConditionTimeout() == 0) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index 0e2d4b2fd439..d257d7c98ec5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -33,7 +33,6 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig { private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; - private static final long DEFAULT_PUBLISH_TIMEOUT = 0; private final int maxRowsInMemory; private final int maxRowsPerSegment; @@ -43,7 +42,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private final IndexSpec indexSpec; private final boolean buildV9Directly; private final boolean reportParseExceptions; - private final long publishTimeout; + @Deprecated private final long handoffConditionTimeout; private final boolean resetOffsetAutomatically; @@ -57,7 +56,6 @@ public KafkaTuningConfig( @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, - @JsonProperty("publishTimeout") Long publishTimeout, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically ) @@ -77,7 +75,6 @@ public KafkaTuningConfig( this.reportParseExceptions = reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions; - this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; this.handoffConditionTimeout = handoffConditionTimeout == null ? defaults.getHandoffConditionTimeout() : handoffConditionTimeout; @@ -97,7 +94,6 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config) config.indexSpec, config.buildV9Directly, config.reportParseExceptions, - config.publishTimeout, config.handoffConditionTimeout, config.resetOffsetAutomatically ); @@ -157,12 +153,7 @@ public boolean isReportParseExceptions() return reportParseExceptions; } - @JsonProperty - public long getPublishTimeout() - { - return publishTimeout; - } - + @Deprecated @JsonProperty public long getHandoffConditionTimeout() { @@ -186,7 +177,6 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) indexSpec, buildV9Directly, reportParseExceptions, - publishTimeout, handoffConditionTimeout, resetOffsetAutomatically ); @@ -203,7 +193,6 @@ public KafkaTuningConfig withMaxRowsInMemory(int rows) indexSpec, buildV9Directly, reportParseExceptions, - publishTimeout, handoffConditionTimeout, resetOffsetAutomatically ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index c476b05e1053..2bcd5c70ccf6 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -87,7 +87,6 @@ public KafkaSupervisorSpec( null, null, null, - null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 0dd20ef02fce..c04771a6ae69 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -44,8 +44,7 @@ public KafkaSupervisorTuningConfig( @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, - @JsonProperty("publishTimeout") Long publishTimeout, - @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, + @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, @JsonProperty("workerThreads") Integer workerThreads, @JsonProperty("chatThreads") Integer chatThreads, @@ -63,7 +62,8 @@ public KafkaSupervisorTuningConfig( indexSpec, buildV9Directly, reportParseExceptions, - publishTimeout, + // Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of + // handoffConditionTimeout handoffConditionTimeout, resetOffsetAutomatically ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 96ac70c86580..52f751757fd1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1381,7 +1381,6 @@ private KafkaIndexTask createTask( null, buildV9Directly, reportParseExceptions, - null, handoffConditionTimeout, resetOffsetAutomatically ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java index 1d5d453db501..d42b44c5475d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -113,7 +113,6 @@ public void testCopyOf() throws Exception new IndexSpec(), true, true, - null, 5L, null ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index c811b63f8053..17507339da38 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -178,7 +178,6 @@ public void setUp() throws Exception false, null, null, - null, numThreads, TEST_CHAT_THREADS, TEST_CHAT_RETRIES, diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index fdb3bd137a77..f869ab6894c7 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -80,14 +80,6 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe */ List getSegments(); - /** - * Indicates that the given segment exists. - * - * @param identifier segment to examine - * @return true if exists. - */ - boolean containsSegment(SegmentIdentifier identifier); - /** * Returns the number of rows in a particular pending segment. * diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index db2e9769e86f..1c3e2794b94d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -230,12 +230,6 @@ public List getSegments() return ImmutableList.copyOf(sinks.keySet()); } - @Override - public boolean containsSegment(SegmentIdentifier identifier) - { - return sinks.containsKey(identifier); - } - @Override public int getRowCount(final SegmentIdentifier identifier) { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java index 58ba077a666e..e4647789851a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java @@ -35,6 +35,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -44,7 +45,6 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; -import io.druid.timeline.DataSegment; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -60,6 +60,7 @@ import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -91,17 +92,15 @@ public class FiniteAppenderatorDriver implements Closeable // sequenceName -> start of segment interval -> segment we're currently adding data to private final Map> activeSegments = new TreeMap<>(); - // sequenceName -> list of segmentIdentifiers + // sequenceName -> list of identifiers of segments waiting for being published + // publishPendingSegments is always a super set of activeSegments because there can be some segments to which data + // are not added anymore, but not published yet. private final Map> publishPendingSegments = new HashMap<>(); // sequenceName -> most recently allocated segment private final Map lastSegmentIds = Maps.newHashMap(); - // Notified when segments are dropped. - private final Object handoffMonitor = new Object(); - private final ListeningExecutorService publishExecutor; - private final ListeningExecutorService handoffExecutor; /** * Create a driver. @@ -132,7 +131,6 @@ public FiniteAppenderatorDriver( this.handoffConditionTimeout = handoffConditionTimeout; this.metrics = Preconditions.checkNotNull(metrics, "metrics"); this.publishExecutor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d")); - this.handoffExecutor = MoreExecutors.listeningDecorator(Execs.singleThreaded("handoff-%d")); } /** @@ -265,9 +263,12 @@ public Object persist(final Committer committer) throws InterruptedException * Register the segments in the given {@link SegmentsAndMetadata} to be handed off and execute a background task which * waits until the hand off completes. * - * @param segmentsAndMetadata - * @return - * @throws InterruptedException + * @param segmentsAndMetadata the result segments and metadata of + * {@link #publish(TransactionalSegmentPublisher, Committer, Collection)} + * + * @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task + * which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata + * of the caller of {@link FiniteAppenderatorDriverMetadata} */ public ListenableFuture registerHandoff(SegmentsAndMetadata segmentsAndMetadata) throws InterruptedException @@ -276,52 +277,38 @@ public ListenableFuture registerHandoff(SegmentsAndMetadata return Futures.immediateFuture(null); } else { - final Object waitForHandoffMonitorStart = new Object(); - - final ListenableFuture future = handoffExecutor.submit( - () -> { - final List waitingSegmentIdList = segmentsAndMetadata.getSegments().stream() - .map(SegmentIdentifier::fromDataSegment) - .collect(Collectors.toList()); - log.info("Awaiting handoff of segments: [%s]", waitingSegmentIdList); + final List waitingSegmentIdList = segmentsAndMetadata.getSegments().stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toList()); - synchronized (waitForHandoffMonitorStart) { - waitForHandoffMonitorStart.notify(); - } - synchronized (handoffMonitor) { - while (waitingSegmentIdList.stream().anyMatch(appenderator::containsSegment)) { - handoffMonitor.wait(); - } - } - - log.info("All segments handed off."); - - return new SegmentsAndMetadata( + if (waitingSegmentIdList.isEmpty()) { + return Futures.immediateFuture( + new SegmentsAndMetadata( segmentsAndMetadata.getSegments(), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() - ); - } - ); - - // wait for handoffMonitor starts - log.info("Waiting handoff monitor starts"); - synchronized (waitForHandoffMonitorStart) { - waitForHandoffMonitorStart.wait(handoffConditionTimeout); + ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + .getCallerMetadata() + ) + ); } - for (final DataSegment dataSegment : segmentsAndMetadata.getSegments()) { + log.info("Register handoff of segments: [%s]", waitingSegmentIdList); + + final SettableFuture resultFuture = SettableFuture.create(); + final AtomicInteger numRemainingHandoffSegments = new AtomicInteger(waitingSegmentIdList.size()); + + for (final SegmentIdentifier segmentIdentifier : waitingSegmentIdList) { handoffNotifier.registerSegmentHandoffCallback( new SegmentDescriptor( - dataSegment.getInterval(), - dataSegment.getVersion(), - dataSegment.getShardSpec().getPartitionNum() + segmentIdentifier.getInterval(), + segmentIdentifier.getVersion(), + segmentIdentifier.getShardSpec().getPartitionNum() ), MoreExecutors.sameThreadExecutor(), () -> { - final SegmentIdentifier identifier = SegmentIdentifier.fromDataSegment(dataSegment); - log.info("Segment[%s] successfully handed off, dropping.", identifier); + log.info("Segment[%s] successfully handed off, dropping.", segmentIdentifier); metrics.incrementHandOffCount(); - final ListenableFuture dropFuture = appenderator.drop(identifier); + + final ListenableFuture dropFuture = appenderator.drop(segmentIdentifier); Futures.addCallback( dropFuture, new FutureCallback() @@ -329,18 +316,24 @@ public ListenableFuture registerHandoff(SegmentsAndMetadata @Override public void onSuccess(Object result) { - synchronized (handoffMonitor) { - handoffMonitor.notifyAll(); + if (numRemainingHandoffSegments.decrementAndGet() == 0) { + log.info("All segments handed off."); + resultFuture.set( + new SegmentsAndMetadata( + segmentsAndMetadata.getSegments(), + ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + .getCallerMetadata() + ) + ); } } @Override public void onFailure(Throwable e) { - log.warn(e, "Failed to drop segment[%s]?!", identifier); - synchronized (handoffMonitor) { - handoffMonitor.notifyAll(); - } + log.warn(e, "Failed to drop segment[%s]?!", segmentIdentifier); + numRemainingHandoffSegments.decrementAndGet(); + resultFuture.setException(e); } } ); @@ -348,7 +341,7 @@ public void onFailure(Throwable e) ); } - return future; + return resultFuture; } } @@ -359,7 +352,6 @@ public void onFailure(Throwable e) public void close() { publishExecutor.shutdownNow(); - handoffExecutor.shutdownNow(); handoffNotifier.close(); } @@ -489,7 +481,13 @@ public void onSuccess(@Nullable SegmentsAndMetadata result) { if (result != null) { synchronized (activeSegments) { - sequenceNames.forEach(publishPendingSegments::remove); + // Remove sequenceName from both publishPendingSemgments and activeSegments + sequenceNames.forEach( + sequenceName -> { + activeSegments.remove(sequenceName); + publishPendingSegments.remove(sequenceName); + } + ); } } } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index 47be734651f6..4d9ea819dcea 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -56,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -208,6 +209,78 @@ public void testHandoffTimeout() throws Exception driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); } + @Test + public void testIncrementalHandoff() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + + Assert.assertNull(driver.startJob()); + + // Add the first row and publish immediately + { + committerSupplier.setMetadata(1); + Assert.assertTrue(driver.add(ROWS.get(0), "dummy", committerSupplier).isOk()); + + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertEquals( + ImmutableSet.of( + new SegmentIdentifier(DATA_SOURCE, new Interval("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)) + ), + asIdentifiers(segmentsAndMetadata.getSegments()) + ); + + Assert.assertEquals(1, segmentsAndMetadata.getCommitMetadata()); + } + + // Add the second and third rows and publish immediately + for (int i = 1; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertEquals( + ImmutableSet.of( + // The second and third rows have the same dataSource, interval, and version, but different shardSpec of + // different partitionNum + new SegmentIdentifier(DATA_SOURCE, new Interval("2000T01/PT1H"), VERSION, new NumberedShardSpec(i - 1, 0)) + ), + asIdentifiers(segmentsAndMetadata.getSegments()) + ); + + Assert.assertEquals(i + 1, segmentsAndMetadata.getCommitMetadata()); + } + + driver.persist(committerSupplier.get()); + + // There is no remaining rows in the driver, and thus the result must be empty + final SegmentsAndMetadata published = driver.publish( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) + .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); + + Assert.assertEquals( + ImmutableSet.of(), + asIdentifiers(segmentsAndMetadata.getSegments()) + ); + + Assert.assertEquals(3, segmentsAndMetadata.getCommitMetadata()); + } + private Set asIdentifiers(Iterable segments) { return ImmutableSet.copyOf( From 2b1d421bef7f8c930f2c3a1034fe315a77139927 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 25 May 2017 10:49:45 +0900 Subject: [PATCH 3/7] Remove unused variable --- .../src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java | 1 - .../src/main/java/io/druid/indexing/common/task/IndexTask.java | 1 - .../realtime/appenderator/FiniteAppenderatorDriver.java | 3 --- .../realtime/appenderator/FiniteAppenderatorDriverTest.java | 1 - 4 files changed, 6 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 474bc5a321f4..07e0ae7464b4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -883,7 +883,6 @@ private FiniteAppenderatorDriver newDriver( toolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getObjectMapper(), - tuningConfig.getHandoffConditionTimeout(), metrics ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 6a01102d38d2..d3a0b9a716b1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -556,7 +556,6 @@ private FiniteAppenderatorDriver newDriver( new NoopSegmentHandoffNotifierFactory(), // don't wait for handoff since we don't serve queries new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getObjectMapper(), - 0, metrics ); } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java index e4647789851a..b7fe9268575c 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java @@ -83,7 +83,6 @@ public class FiniteAppenderatorDriver implements Closeable private final SegmentHandoffNotifier handoffNotifier; private final UsedSegmentChecker usedSegmentChecker; private final ObjectMapper objectMapper; - private final long handoffConditionTimeout; private final FireDepartmentMetrics metrics; // All access to "activeSegments", "publishPendingSegments", and "lastSegmentId" must be synchronized on @@ -118,7 +117,6 @@ public FiniteAppenderatorDriver( SegmentHandoffNotifierFactory handoffNotifierFactory, UsedSegmentChecker usedSegmentChecker, ObjectMapper objectMapper, - long handoffConditionTimeout, FireDepartmentMetrics metrics ) { @@ -128,7 +126,6 @@ public FiniteAppenderatorDriver( .createSegmentHandoffNotifier(appenderator.getDataSource()); this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); - this.handoffConditionTimeout = handoffConditionTimeout; this.metrics = Preconditions.checkNotNull(metrics, "metrics"); this.publishExecutor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d")); } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index 4d9ea819dcea..1219c64d96e7 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -108,7 +108,6 @@ public void setUp() segmentHandoffNotifierFactory, new TestUsedSegmentChecker(), OBJECT_MAPPER, - HANDOFF_CONDITION_TIMEOUT, new FireDepartmentMetrics() ); } From 7545257202cbcd8944abe3b00aa8038eb9aeb648 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 25 May 2017 18:39:18 +0900 Subject: [PATCH 4/7] Add persist() to Appenderator and more tests for AppenderatorDriver --- .../realtime/appenderator/Appenderator.java | 37 +- .../appenderator/AppenderatorImpl.java | 17 +- .../FiniteAppenderatorDriver.java | 27 +- .../FiniteAppenderatorDriverFailTest.java | 459 ++++++++++++++++++ .../FiniteAppenderatorDriverTest.java | 84 +++- 5 files changed, 597 insertions(+), 27 deletions(-) create mode 100644 server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index f869ab6894c7..f9d416b9e8f2 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -27,6 +27,7 @@ import io.druid.segment.incremental.IndexSizeExceededException; import java.io.Closeable; +import java.util.Collection; import java.util.List; /** @@ -61,7 +62,7 @@ public interface Appenderator extends QuerySegmentWalker, Closeable * Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used * asynchronously. *

- * The add, clear, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread. * * @param identifier the segment into which this row should be added * @param row the row to add @@ -96,7 +97,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe * for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been * cleared. This may take some time, since all pending persists must finish first. * - * The add, clear, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread. */ void clear() throws InterruptedException; @@ -114,27 +115,47 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe */ ListenableFuture drop(SegmentIdentifier identifier); + /** + * Persist any in-memory indexed data for segments of the given identifiers to durable storage. This may be only + * somewhat durable, e.g. the machine's local disk. The Committer will be made synchronously will the call to + * persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with + * the data persisted to disk. + *

+ * The add, clear, persist, persistAll, and push methods should all be called from the same thread. + * + * @param identifiers segment identifiers to be persisted + * @param committer a committer associated with all data that has been added to segments of the given identifiers so + * far + * + * @return future that resolves when all pending data to segments of the identifiers has been persisted, contains + * commit metadata for this persist + */ + ListenableFuture persist(Collection identifiers, Committer committer); + /** * Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the * machine's local disk. The Committer will be made synchronously will the call to persistAll, but will actually * be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to * disk. *

- * The add, clear, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread. * * @param committer a committer associated with all data that has been added so far * * @return future that resolves when all pending data has been persisted, contains commit metadata for this persist */ - ListenableFuture persistAll(Committer committer); + default ListenableFuture persistAll(Committer committer) + { + return persist(getSegments(), committer); + } /** - * Merge and push particular segments to deep storage. This will trigger an implicit {@link #persistAll(Committer)} - * using the provided Committer. + * Merge and push particular segments to deep storage. This will trigger an implicit + * {@link #persist(Collection, Committer)} using the provided Committer. *

* After this method is called, you cannot add new data to any segments that were previously under construction. *

- * The add, clear, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread. * * @param identifiers list of segments to push * @param committer a committer associated with all data that has been added so far @@ -142,7 +163,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe * @return future that resolves when all segments have been pushed. The segment list will be the list of segments * that have been pushed and the commit metadata from the Committer. */ - ListenableFuture push(List identifiers, Committer committer); + ListenableFuture push(Collection identifiers, Committer committer); /** * Stop any currently-running processing and clean up after ourselves. This will not remove any on-disk persisted diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 1c3e2794b94d..c2bfdf1e9b93 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -80,6 +80,7 @@ import java.nio.channels.FileLock; import java.nio.file.StandardOpenOption; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -346,13 +347,10 @@ public ListenableFuture drop(final SegmentIdentifier identifier) } @Override - public ListenableFuture persistAll(final Committer committer) + public ListenableFuture persist(Collection identifiers, Committer committer) { - // Submit persistAll task to the persistExecutor - final Map commitHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); - final Set identifiers = sinks.keySet(); for (SegmentIdentifier identifier : identifiers) { final Sink sink = sinks.get(identifier); final List hydrants = Lists.newArrayList(sink); @@ -437,9 +435,16 @@ public String apply(Map.Entry entry) return future; } + @Override + public ListenableFuture persistAll(final Committer committer) + { + // Submit persistAll task to the persistExecutor + return persist(sinks.keySet(), committer); + } + @Override public ListenableFuture push( - final List identifiers, + final Collection identifiers, final Committer committer ) { @@ -454,7 +459,7 @@ public ListenableFuture push( } return Futures.transform( - persistAll(committer), + persist(identifiers, committer), new Function() { @Override diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java index b7fe9268575c..10c9c4cdb388 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java @@ -268,7 +268,6 @@ public Object persist(final Committer committer) throws InterruptedException * of the caller of {@link FiniteAppenderatorDriverMetadata} */ public ListenableFuture registerHandoff(SegmentsAndMetadata segmentsAndMetadata) - throws InterruptedException { if (segmentsAndMetadata == null) { return Futures.immediateFuture(null); @@ -446,13 +445,14 @@ public void moveSegmentOut(final String sequenceName, final List publish( final TransactionalSegmentPublisher publisher, final Committer committer, final Collection sequenceNames - ) throws InterruptedException + ) { final List theSegments; synchronized (activeSegments) { @@ -492,6 +492,8 @@ public void onSuccess(@Nullable SegmentsAndMetadata result) @Override public void onFailure(Throwable t) { + // The throwable is propagated anyway when get() is called on the future. + // See FiniteAppenderatorFailTest.testInterruptDuringPush(). log.error(t, "Failed to publish segments[%s]", theSegments); } } @@ -504,8 +506,7 @@ public void onFailure(Throwable t) * Execute a task in background to publish the given segments. The task blocks until complete. * Retries forever on transient failures, but may exit early on permanent failures. * - * Should be called after all data has been added and persisted through {@link #add(InputRow, String, Supplier)} and - * {@link #persist(Committer)}. + * Should be called after all data has been added through {@link #add(InputRow, String, Supplier)}. * * @param publisher publisher to use for this set of segments * @param wrappedCommitter committer representing all data that has been added so far @@ -517,7 +518,7 @@ private ListenableFuture publish( final TransactionalSegmentPublisher publisher, final WrappedCommitter wrappedCommitter, final List segmentIdentifiers - ) throws InterruptedException + ) { return publishExecutor.submit( () -> { @@ -575,7 +576,7 @@ private ListenableFuture publish( } catch (Exception e) { final long sleepMillis = computeNextRetrySleep(++nTry); - log.warn(e, "Failed publishAll (try %d), retrying in %,dms.", nTry, sleepMillis); + log.warn(e, "Failed publish (try %d), retrying in %,dms.", nTry, sleepMillis); Thread.sleep(sleepMillis); } } @@ -583,6 +584,18 @@ private ListenableFuture publish( ); } + public ListenableFuture publishAndRegisterHandoff( + final TransactionalSegmentPublisher publisher, + final Committer committer, + final Collection sequenceNames + ) + { + return Futures.transform( + publish(publisher, committer, sequenceNames), + this::registerHandoff + ); + } + private interface WrappedCommitter extends Committer { } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java new file mode 100644 index 000000000000..e38470e750db --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java @@ -0,0 +1,459 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.data.input.Committer; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.SegmentDescriptor; +import io.druid.segment.incremental.IndexSizeExceededException; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestCommitterSupplier; +import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestSegmentAllocator; +import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestSegmentHandoffNotifierFactory; +import io.druid.timeline.DataSegment; +import org.hamcrest.CoreMatchers; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public class FiniteAppenderatorDriverFailTest +{ + private static final String DATA_SOURCE = "foo"; + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); + private static final long PUBLISH_TIMEOUT = 1000; + + private static final List ROWS = ImmutableList.of( + new MapBasedInputRow( + new DateTime("2000"), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "1") + ), + new MapBasedInputRow( + new DateTime("2000T01"), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", 2.0) + ), + new MapBasedInputRow( + new DateTime("2000T01"), + ImmutableList.of("dim2"), + ImmutableMap.of("dim2", "bar", "met1", 2.0) + ) + ); + + SegmentAllocator allocator; + TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; + FiniteAppenderatorDriver driver; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() + { + allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); + } + + @After + public void tearDown() throws Exception + { + if (driver != null) { + driver.clear(); + driver.close(); + } + } + + @Test + public void testFailDuringPersist() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + expectedException.expect(TimeoutException.class); + + driver = new FiniteAppenderatorDriver( + createPersistFailAppenderator(), + allocator, + segmentHandoffNotifierFactory, + new NoopUsedSegmentChecker(), + OBJECT_MAPPER, + new FireDepartmentMetrics() + ); + + driver.startJob(); + + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.setHandoffDelay(100); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + driver.publish( + FiniteAppenderatorDriverTest.makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + } + + @Test + public void testInterruptDuringPush() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class)); + + driver = new FiniteAppenderatorDriver( + createPushInterruptAppenderator(), + allocator, + segmentHandoffNotifierFactory, + new NoopUsedSegmentChecker(), + OBJECT_MAPPER, + new FireDepartmentMetrics() + ); + + driver.startJob(); + + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.setHandoffDelay(100); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + driver.publish( + FiniteAppenderatorDriverTest.makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + } + + @Test + public void testFailDuringPush() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + expectedException.expect(TimeoutException.class); + + driver = new FiniteAppenderatorDriver( + createPushFailAppenderator(), + allocator, + segmentHandoffNotifierFactory, + new NoopUsedSegmentChecker(), + OBJECT_MAPPER, + new FireDepartmentMetrics() + ); + + driver.startJob(); + + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.setHandoffDelay(100); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + driver.publish( + FiniteAppenderatorDriverTest.makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + } + + @Test + public void testFailDuringDrop() throws IOException, InterruptedException, TimeoutException, ExecutionException + { + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); + expectedException.expectMessage( + "Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]" + ); + + driver = new FiniteAppenderatorDriver( + createDropFailAppenderator(), + allocator, + segmentHandoffNotifierFactory, + new NoopUsedSegmentChecker(), + OBJECT_MAPPER, + new FireDepartmentMetrics() + ); + + driver.startJob(); + + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.setHandoffDelay(100); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + } + + final SegmentsAndMetadata published = driver.publish( + FiniteAppenderatorDriverTest.makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + + driver.registerHandoff(published).get(); + } + + private class NoopUsedSegmentChecker implements UsedSegmentChecker + { + @Override + public Set findUsedSegments(Set identifiers) throws IOException + { + return ImmutableSet.of(); + } + } + + static Appenderator createPushFailAppenderator() + { + return new FailableAppenderator().disablePush(); + } + + static Appenderator createPushInterruptAppenderator() + { + return new FailableAppenderator().interruptPush(); + } + + static Appenderator createPersistFailAppenderator() + { + return new FailableAppenderator().disablePersist(); + } + + static Appenderator createDropFailAppenderator() + { + return new FailableAppenderator().disableDrop(); + } + + private static class FailableAppenderator implements Appenderator + + { + private final Map> rows = new HashMap<>(); + + private boolean dropEnabled = true; + private boolean persistEnabled = true; + private boolean pushEnabled = true; + private boolean interruptPush = false; + + private int numRows; + + public FailableAppenderator disableDrop() + { + dropEnabled = false; + return this; + } + + public FailableAppenderator disablePersist() + { + persistEnabled = false; + return this; + } + + public FailableAppenderator disablePush() + { + pushEnabled = false; + interruptPush = false; + return this; + } + + public FailableAppenderator interruptPush() + { + pushEnabled = false; + interruptPush = true; + return this; + } + + @Override + public String getDataSource() + { + return null; + } + + @Override + public Object startJob() + { + return null; + } + + @Override + public int add( + SegmentIdentifier identifier, InputRow row, Supplier committerSupplier + ) throws IndexSizeExceededException, SegmentNotWritableException + { + rows.computeIfAbsent(identifier, k -> new ArrayList<>()).add(row); + return ++numRows; + } + + @Override + public List getSegments() + { + return ImmutableList.copyOf(rows.keySet()); + } + + @Override + public int getRowCount(SegmentIdentifier identifier) + { + final List rows = this.rows.get(identifier); + if (rows != null) { + return rows.size(); + } else { + return 0; + } + } + + @Override + public void clear() throws InterruptedException + { + rows.clear(); + } + + @Override + public ListenableFuture drop(SegmentIdentifier identifier) + { + if (dropEnabled) { + rows.remove(identifier); + return Futures.immediateFuture(null); + } else { + return Futures.immediateFailedFuture(new ISE("Fail test while dropping segment[%s]", identifier)); + } + } + + @Override + public ListenableFuture persist( + Collection identifiers, Committer committer + ) + { + if (persistEnabled) { + // do nothing + return Futures.immediateFuture(committer.getMetadata()); + } else { + return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", identifiers)); + } + } + + @Override + public ListenableFuture push( + Collection identifiers, Committer committer + ) + { + if (pushEnabled) { + final List segments = identifiers.stream() + .map( + id -> new DataSegment( + id.getDataSource(), + id.getInterval(), + id.getVersion(), + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + id.getShardSpec(), + 0, + 0 + ) + ) + .collect(Collectors.toList()); + return Futures.transform( + persist(identifiers, committer), + (Function) commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata) + ); + } else { + if (interruptPush) { + return new AbstractFuture() + { + @Override + public SegmentsAndMetadata get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException, ExecutionException + { + throw new InterruptedException("Interrupt test while pushing segments"); + } + + @Override + public SegmentsAndMetadata get() throws InterruptedException, ExecutionException + { + throw new InterruptedException("Interrupt test while pushing segments"); + } + }; + } else { + return Futures.immediateFailedFuture(new ISE("Fail test while pushing segments[%s]", identifiers)); + } + } + } + + @Override + public void close() + { + + } + + @Override + public QueryRunner getQueryRunnerForIntervals( + Query query, Iterable intervals + ) + { + throw new UnsupportedOperationException(); + } + + @Override + public QueryRunner getQueryRunnerForSegments( + Query query, Iterable specs + ) + { + throw new UnsupportedOperationException(); + } + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index 1219c64d96e7..e725c4a95581 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -29,6 +29,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; @@ -209,7 +212,7 @@ public void testHandoffTimeout() throws Exception } @Test - public void testIncrementalHandoff() throws IOException, InterruptedException, TimeoutException, ExecutionException + public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException { final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); @@ -280,6 +283,60 @@ public void testIncrementalHandoff() throws IOException, InterruptedException, T Assert.assertEquals(3, segmentsAndMetadata.getCommitMetadata()); } + @Test + public void testIncrementalHandoff() throws Exception + { + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + + Assert.assertNull(driver.startJob()); + + committerSupplier.setMetadata(1); + Assert.assertTrue(driver.add(ROWS.get(0), "sequence_0", committerSupplier).isOk()); + + for (int i = 1; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "sequence_1", committerSupplier).isOk()); + } + + final ListenableFuture futureForSequence0 = driver.publishAndRegisterHandoff( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("sequence_0") + ); + + final ListenableFuture futureForSequence1 = driver.publishAndRegisterHandoff( + makeOkPublisher(), + committerSupplier.get(), + ImmutableList.of("sequence_1") + ); + + final SegmentsAndMetadata handedoffFromSequence0 = futureForSequence0.get( + HANDOFF_CONDITION_TIMEOUT, + TimeUnit.MILLISECONDS + ); + final SegmentsAndMetadata handedoffFromSequence1 = futureForSequence1.get( + HANDOFF_CONDITION_TIMEOUT, + TimeUnit.MILLISECONDS + ); + + Assert.assertEquals( + ImmutableSet.of( + new SegmentIdentifier(DATA_SOURCE, new Interval("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)) + ), + asIdentifiers(handedoffFromSequence0.getSegments()) + ); + + Assert.assertEquals( + ImmutableSet.of( + new SegmentIdentifier(DATA_SOURCE, new Interval("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0)) + ), + asIdentifiers(handedoffFromSequence1.getSegments()) + ); + + Assert.assertEquals(3, handedoffFromSequence0.getCommitMetadata()); + Assert.assertEquals(3, handedoffFromSequence1.getCommitMetadata()); + } + private Set asIdentifiers(Iterable segments) { return ImmutableSet.copyOf( @@ -297,7 +354,7 @@ public SegmentIdentifier apply(DataSegment input) ); } - private TransactionalSegmentPublisher makeOkPublisher() + static TransactionalSegmentPublisher makeOkPublisher() { return new TransactionalSegmentPublisher() { @@ -309,7 +366,7 @@ public boolean publishSegments(Set segments, Object commitMetadata) }; } - private static class TestCommitterSupplier implements Supplier + static class TestCommitterSupplier implements Supplier { private final AtomicReference metadata = new AtomicReference<>(); @@ -339,7 +396,7 @@ public void run() } } - private static class TestSegmentAllocator implements SegmentAllocator + static class TestSegmentAllocator implements SegmentAllocator { private final String dataSource; private final Granularity granularity; @@ -374,15 +431,21 @@ public SegmentIdentifier allocate( } } - private static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory + static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory { private boolean handoffEnabled = true; + private long handoffDelay; public void disableHandoff() { handoffEnabled = false; } + public void setHandoffDelay(long delay) + { + handoffDelay = delay; + } + @Override public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) { @@ -396,7 +459,16 @@ public boolean registerSegmentHandoffCallback( ) { if (handoffEnabled) { - // Immediate handoff + + if (handoffDelay > 0) { + try { + Thread.sleep(handoffDelay); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + exec.execute(handOffRunnable); } return true; From 408a783807a68fe07508c3b765f1f32f2e868069 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 26 May 2017 01:16:59 +0900 Subject: [PATCH 5/7] Remove unused imports --- .../realtime/appenderator/FiniteAppenderatorDriverTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index e725c4a95581..9357a051d5d8 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -29,8 +29,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; From 073578c6cf07b223278bbe3bfac4172175856f39 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 26 May 2017 01:24:06 +0900 Subject: [PATCH 6/7] Fix strict build --- .../realtime/appenderator/FiniteAppenderatorDriverFailTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java index e38470e750db..489a1ed2864c 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverFailTest.java @@ -251,7 +251,7 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo driver.registerHandoff(published).get(); } - private class NoopUsedSegmentChecker implements UsedSegmentChecker + private static class NoopUsedSegmentChecker implements UsedSegmentChecker { @Override public Set findUsedSegments(Set identifiers) throws IOException From e81fe17b9e00e45d2233f7220b8c0bee30392d1f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 1 Jun 2017 23:53:10 +0900 Subject: [PATCH 7/7] Address comments --- .../main/java/io/druid/indexing/common/task/IndexTask.java | 2 +- .../io/druid/segment/realtime/appenderator/Appenderator.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index d3a0b9a716b1..6a826d860545 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -500,7 +500,7 @@ public boolean publishSegments(Set segments, Object commitMetadata) publisher, committerSupplier.get(), sequenceNameToShardSpecMap.keySet() - ).get(ingestionSchema.getTuningConfig().getPublishTimeout(), TimeUnit.MILLISECONDS); + ).get(publishTimeout, TimeUnit.MILLISECONDS); } if (published == null) { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index f9d416b9e8f2..abfdc9f828f3 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -117,7 +117,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe /** * Persist any in-memory indexed data for segments of the given identifiers to durable storage. This may be only - * somewhat durable, e.g. the machine's local disk. The Committer will be made synchronously will the call to + * somewhat durable, e.g. the machine's local disk. The Committer will be made synchronously with the call to * persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with * the data persisted to disk. *

@@ -134,7 +134,7 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe /** * Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the - * machine's local disk. The Committer will be made synchronously will the call to persistAll, but will actually + * machine's local disk. The Committer will be made synchronously with the call to persistAll, but will actually * be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to * disk. *