From 888b9c512b8a784fb2766b41a3816b894777d239 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 22 Jan 2018 09:56:02 -0800 Subject: [PATCH] Fix early publishing to early pushing in batch indexing & refactor appenderatorDriver --- .../druid/indexing/kafka/KafkaIndexTask.java | 38 +- .../druid/indexing/common/task/IndexTask.java | 260 +++--- .../common/task/CompactionTaskTest.java | 3 +- .../indexing/common/task/IndexTaskTest.java | 13 +- .../indexing/common/task/TaskSerdeTest.java | 4 +- .../indexing/overlord/TaskLifecycleTest.java | 6 +- .../realtime/appenderator/Appenderator.java | 31 +- .../appenderator/AppenderatorDriver.java | 776 ------------------ .../AppenderatorDriverAddResult.java | 2 +- .../AppenderatorDriverMetadata.java | 20 +- .../appenderator/AppenderatorImpl.java | 148 ++-- .../appenderator/BaseAppenderatorDriver.java | 541 ++++++++++++ .../appenderator/BatchAppenderatorDriver.java | 209 +++++ .../appenderator/SegmentWithState.java | 153 ++++ .../appenderator/SegmentsAndMetadata.java | 4 +- .../StreamAppenderatorDriver.java | 416 ++++++++++ .../TransactionalSegmentPublisher.java | 3 +- .../BatchAppenderatorDriverTest.java | 189 +++++ .../appenderator/SegmentWithStateTest.java | 54 ++ ... => StreamAppenderatorDriverFailTest.java} | 28 +- ...java => StreamAppenderatorDriverTest.java} | 62 +- .../appenderator/TestUsedSegmentChecker.java | 66 ++ 22 files changed, 1932 insertions(+), 1094 deletions(-) delete mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java create mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java create mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java create mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/SegmentWithState.java create mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java create mode 100644 server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java create mode 100644 server/src/test/java/io/druid/segment/realtime/appenderator/SegmentWithStateTest.java rename server/src/test/java/io/druid/segment/realtime/appenderator/{AppenderatorDriverFailTest.java => StreamAppenderatorDriverFailTest.java} (94%) rename server/src/test/java/io/druid/segment/realtime/appenderator/{AppenderatorDriverTest.java => StreamAppenderatorDriverTest.java} (90%) create mode 100644 server/src/test/java/io/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 4a2ecbbae10c..3fd6de7caf5c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -82,9 +82,9 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; -import io.druid.segment.realtime.appenderator.AppenderatorDriver; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; @@ -136,7 +136,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -187,7 +186,7 @@ public enum Status private TaskToolbox toolbox; private volatile Appenderator appenderator = null; - private volatile AppenderatorDriver driver = null; + private volatile StreamAppenderatorDriver driver = null; private volatile FireDepartmentMetrics fireDepartmentMetrics = null; private volatile DateTime startTime; private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) @@ -372,7 +371,7 @@ private void createAndStartPublishExecutor() Joiner.on(", ").join( result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()) ), - result.getCommitMetadata() + Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata") ); } @@ -423,9 +422,11 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception if (getContext() != null && getContext().get("checkpoints") != null) { log.info("Got checkpoints [%s]", (String) getContext().get("checkpoints")); final TreeMap> checkpoints = toolbox.getObjectMapper().readValue( - (String) getContext().get("checkpoints"), new TypeReference>>() + (String) getContext().get("checkpoints"), + new TypeReference>>() { - }); + } + ); Iterator>> sequenceOffsets = checkpoints.entrySet().iterator(); Map.Entry> previous = sequenceOffsets.next(); @@ -709,7 +710,7 @@ public void run() if (addResult.isOk()) { // If the number of rows in the segment exceeds the threshold after adding a row, - // move the segment out from the active segments of AppenderatorDriver to make a new segment. + // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment. if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) { if (!sequenceToUse.isCheckpointed()) { sequenceToCheckpoint = sequenceToUse; @@ -747,7 +748,6 @@ public void onFailure(Throwable t) } } ); - } } catch (ParseException e) { @@ -850,7 +850,7 @@ public void onFailure(Throwable t) Joiner.on(", ").join( handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()) ), - handedOff.getCommitMetadata() + Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") ); } } @@ -940,9 +940,9 @@ private TaskStatus runLegacy(final TaskToolbox toolbox) throws Exception ); try ( - final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); - final AppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); - final KafkaConsumer consumer = newConsumer() + final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); + final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); + final KafkaConsumer consumer = newConsumer() ) { toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); @@ -1104,7 +1104,7 @@ public void run() if (addResult.isOk()) { // If the number of rows in the segment exceeds the threshold after adding a row, - // move the segment out from the active segments of AppenderatorDriver to make a new segment. + // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment. if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) { segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet<>()) .add(addResult.getSegmentIdentifier()); @@ -1170,7 +1170,7 @@ public void run() final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( - ((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS), + ((Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class ); @@ -1230,7 +1230,7 @@ public String apply(DataSegment input) } ) ), - handedOff.getCommitMetadata() + Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") ); } } @@ -1269,7 +1269,7 @@ private void checkAndMaybeThrowException() } private void maybePersistAndPublishSequences(Supplier committerSupplier) - throws ExecutionException, InterruptedException + throws InterruptedException { for (SequenceMetadata sequenceMetadata : sequences) { sequenceMetadata.updateAssignments(nextOffsets); @@ -1793,13 +1793,13 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox ); } - private AppenderatorDriver newDriver( + private StreamAppenderatorDriver newDriver( final Appenderator appenderator, final TaskToolbox toolbox, final FireDepartmentMetrics metrics ) { - return new AppenderatorDriver( + return new StreamAppenderatorDriver( appenderator, new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema), toolbox.getSegmentHandoffNotifierFactory(), @@ -2222,7 +2222,7 @@ public TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean u { return (segments, commitMetadata) -> { final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( - ((Map) commitMetadata).get(METADATA_PUBLISH_PARTITIONS), + ((Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_PUBLISH_PARTITIONS), KafkaPartitions.class ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 1a9bade78c9f..55296f05a8de 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -24,11 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -36,7 +34,6 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import com.google.common.util.concurrent.ListenableFuture; -import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; @@ -57,7 +54,6 @@ import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; -import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.query.DruidMetrics; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; @@ -71,15 +67,15 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorConfig; -import io.druid.segment.realtime.appenderator.AppenderatorDriver; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; -import io.druid.segment.realtime.plumber.Committers; -import io.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory; +import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; @@ -546,7 +542,7 @@ private static BiFunction getShardSpecCreateFunctio } /** - * This method reads input data row by row and adds the read row to a proper segment using {@link AppenderatorDriver}. + * This method reads input data row by row and adds the read row to a proper segment using {@link BaseAppenderatorDriver}. * If there is no segment for the row, a new one is created. Segments can be published in the middle of reading inputs * if one of below conditions are satisfied. * @@ -555,7 +551,7 @@ private static BiFunction getShardSpecCreateFunctio * If the number of rows in a segment exceeds {@link IndexTuningConfig#targetPartitionSize} * *
  • - * If the number of rows added to {@link AppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows} + * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows} *
  • * * @@ -590,11 +586,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null final IndexIOConfig ioConfig = ingestionSchema.getIOConfig(); final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig; - final long publishTimeout = tuningConfig.getPublishTimeout(); - final long maxRowsInAppenderator = tuningConfig.getMaxTotalRows(); - final int maxRowsInSegment = tuningConfig.getTargetPartitionSize() == null - ? Integer.MAX_VALUE - : tuningConfig.getTargetPartitionSize(); + final long pushTimeout = tuningConfig.getPushTimeout(); final boolean isGuaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig); final SegmentAllocator segmentAllocator; @@ -644,7 +636,12 @@ dataSchema, new RealtimeIOConfig(null, null, null), null } final int partitionNum = counters.computeIfAbsent(interval, x -> new AtomicInteger()).getAndIncrement(); - return new SegmentIdentifier(getDataSource(), interval, findVersion(versions, interval), new NumberedShardSpec(partitionNum, 0)); + return new SegmentIdentifier( + getDataSource(), + interval, + findVersion(versions, interval), + new NumberedShardSpec(partitionNum, 0) + ); }; } @@ -654,97 +651,73 @@ dataSchema, new RealtimeIOConfig(null, null, null), null }; try ( - final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig); - final AppenderatorDriver driver = newDriver( - appenderator, - toolbox, - segmentAllocator, - fireDepartmentMetrics - ); - final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) + final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig); + final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); + final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { - final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); + driver.startJob(); - if (driver.startJob() != null) { - driver.clear(); - } - - try { - while (firehose.hasMore()) { - try { - final InputRow inputRow = firehose.nextRow(); + while (firehose.hasMore()) { + try { + final InputRow inputRow = firehose.nextRow(); - if (inputRow == null) { - fireDepartmentMetrics.incrementThrownAway(); - continue; - } + if (inputRow == null) { + fireDepartmentMetrics.incrementThrownAway(); + continue; + } - final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); - if (!optInterval.isPresent()) { - fireDepartmentMetrics.incrementThrownAway(); - continue; - } + final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); + if (!optInterval.isPresent()) { + fireDepartmentMetrics.incrementThrownAway(); + continue; + } - final String sequenceName; - - if (isGuaranteedRollup) { - // Sequence name is based solely on the shardSpec, and there will only be one segment per sequence. - final Interval interval = optInterval.get(); - final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, inputRow); - sequenceName = Appenderators.getSequenceName(interval, findVersion(versions, interval), shardSpec); - } else { - // Segments are created as needed, using a single sequence name. They may be allocated from the overlord - // (in append mode) or may be created on our own authority (in overwrite mode). - sequenceName = getId(); - } - final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier); - - if (addResult.isOk()) { - // incremental segment publishment is allowed only when rollup don't have to be perfect. - if (!isGuaranteedRollup && - (addResult.getNumRowsInSegment() >= maxRowsInSegment || - addResult.getTotalNumRowsInAppenderator() >= maxRowsInAppenderator)) { - // There can be some segments waiting for being published even though any rows won't be added to them. - // If those segments are not published here, the available space in appenderator will be kept to be small - // which makes the size of segments smaller. - final SegmentsAndMetadata published = awaitPublish( - driver.publishAll( - publisher, - committerSupplier.get() - ), - publishTimeout - ); - // Even though IndexTask uses NoopHandoffNotifier which does nothing for segment handoff, - // the below code is needed to update the total number of rows added to the appenderator so far. - // See AppenderatorDriver.registerHandoff() and Appenderator.drop(). - // A hard-coded timeout is used here because the below get() is expected to return immediately. - driver.registerHandoff(published).get(30, TimeUnit.SECONDS); - } - } else { - throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp()); - } + final String sequenceName; - fireDepartmentMetrics.incrementProcessed(); + if (isGuaranteedRollup) { + // Sequence name is based solely on the shardSpec, and there will only be one segment per sequence. + final Interval interval = optInterval.get(); + final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, inputRow); + sequenceName = Appenderators.getSequenceName(interval, findVersion(versions, interval), shardSpec); + } else { + // Segments are created as needed, using a single sequence name. They may be allocated from the overlord + // (in append mode) or may be created on our own authority (in overwrite mode). + sequenceName = getId(); } - catch (ParseException e) { - if (tuningConfig.isReportParseExceptions()) { - throw e; - } else { - fireDepartmentMetrics.incrementUnparseable(); + final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName); + + if (addResult.isOk()) { + // incremental segment publishment is allowed only when rollup don't have to be perfect. + if (!isGuaranteedRollup && + (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(), tuningConfig) || + exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(), tuningConfig))) { + // There can be some segments waiting for being published even though any rows won't be added to them. + // If those segments are not published here, the available space in appenderator will be kept to be small + // which makes the size of segments smaller. + final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout); + log.info("Pushed segments[%s]", pushed.getSegments()); } + } else { + throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp()); + } + + fireDepartmentMetrics.incrementProcessed(); + } + catch (ParseException e) { + if (tuningConfig.isReportParseExceptions()) { + throw e; + } else { + fireDepartmentMetrics.incrementUnparseable(); } } } - finally { - driver.persist(committerSupplier.get()); - } + + final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout); + log.info("Pushed segments[%s]", pushed.getSegments()); final SegmentsAndMetadata published = awaitPublish( - driver.publishAll( - publisher, - committerSupplier.get() - ), - publishTimeout + driver.publishAll(publisher), + pushTimeout ); if (published == null) { @@ -755,14 +728,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null "Published segments[%s]", Joiner.on(", ").join( Iterables.transform( published.getSegments(), - new Function() - { - @Override - public String apply(DataSegment input) - { - return input.getIdentifier(); - } - } + DataSegment::getIdentifier ) ) ); @@ -774,11 +740,24 @@ public String apply(DataSegment input) } } + private static boolean exceedMaxRowsInSegment(int numRowsInSegment, IndexTuningConfig indexTuningConfig) + { + // maxRowsInSegment should be null if numShards is set in indexTuningConfig + final Integer maxRowsInSegment = indexTuningConfig.getTargetPartitionSize(); + return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment; + } + + private static boolean exceedMaxRowsInAppenderator(long numRowsInAppenderator, IndexTuningConfig indexTuningConfig) + { + // maxRowsInAppenderator should be null if numShards is set in indexTuningConfig + final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows(); + return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator; + } + private static SegmentsAndMetadata awaitPublish( ListenableFuture publishFuture, long publishTimeout - ) - throws ExecutionException, InterruptedException, TimeoutException + ) throws ExecutionException, InterruptedException, TimeoutException { if (publishTimeout == 0) { return publishFuture.get(); @@ -805,20 +784,16 @@ private static Appenderator newAppenderator( ); } - private static AppenderatorDriver newDriver( + private static BatchAppenderatorDriver newDriver( final Appenderator appenderator, final TaskToolbox toolbox, - final SegmentAllocator segmentAllocator, - final FireDepartmentMetrics metrics + final SegmentAllocator segmentAllocator ) { - return new AppenderatorDriver( + return new BatchAppenderatorDriver( appenderator, segmentAllocator, - new NoopSegmentHandoffNotifierFactory(), // don't wait for handoff since we don't serve queries - new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), - toolbox.getObjectMapper(), - metrics + new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()) ); } @@ -950,13 +925,13 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false; private static final boolean DEFAULT_GUARANTEE_ROLLUP = false; private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; - private static final long DEFAULT_PUBLISH_TIMEOUT = 0; + private static final long DEFAULT_PUSH_TIMEOUT = 0; static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; private final Integer targetPartitionSize; private final int maxRowsInMemory; - private final int maxTotalRows; + private final Long maxTotalRows; private final Integer numShards; private final IndexSpec indexSpec; private final File basePersistDirectory; @@ -964,7 +939,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final boolean forceExtendableShardSpecs; private final boolean forceGuaranteedRollup; private final boolean reportParseExceptions; - private final long publishTimeout; + private final long pushTimeout; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; @@ -972,7 +947,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi public IndexTuningConfig( @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, - @JsonProperty("maxTotalRows") @Nullable Integer maxTotalRows, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @@ -982,7 +957,8 @@ public IndexTuningConfig( @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs, @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup, @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, - @JsonProperty("publishTimeout") @Nullable Long publishTimeout, + @JsonProperty("publishTimeout") @Nullable Long publishTimeout, // deprecated + @JsonProperty("pushTimeout") @Nullable Long pushTimeout, @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory ) { @@ -996,7 +972,7 @@ public IndexTuningConfig( forceExtendableShardSpecs, forceGuaranteedRollup, reportParseExceptions, - publishTimeout, + pushTimeout != null ? pushTimeout : publishTimeout, null, segmentWriteOutMediumFactory ); @@ -1010,14 +986,14 @@ private IndexTuningConfig() private IndexTuningConfig( @Nullable Integer targetPartitionSize, @Nullable Integer maxRowsInMemory, - @Nullable Integer maxTotalRows, + @Nullable Long maxTotalRows, @Nullable Integer numShards, @Nullable IndexSpec indexSpec, @Nullable Integer maxPendingPersists, @Nullable Boolean forceExtendableShardSpecs, @Nullable Boolean forceGuaranteedRollup, @Nullable Boolean reportParseExceptions, - @Nullable Long publishTimeout, + @Nullable Long pushTimeout, @Nullable File basePersistDirectory, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory ) @@ -1027,15 +1003,9 @@ private IndexTuningConfig( "targetPartitionSize and numShards cannot both be set" ); - this.targetPartitionSize = numShards != null && !numShards.equals(-1) - ? null - : (targetPartitionSize == null || targetPartitionSize.equals(-1) - ? DEFAULT_TARGET_PARTITION_SIZE - : targetPartitionSize); + this.targetPartitionSize = initializeTargetPartitionSize(numShards, targetPartitionSize); this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; - this.maxTotalRows = maxTotalRows == null - ? DEFAULT_MAX_TOTAL_ROWS - : maxTotalRows; + this.maxTotalRows = initializeMaxTotalRows(numShards, maxTotalRows); this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; @@ -1046,7 +1016,7 @@ private IndexTuningConfig( this.reportParseExceptions = reportParseExceptions == null ? DEFAULT_REPORT_PARSE_EXCEPTIONS : reportParseExceptions; - this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; + this.pushTimeout = pushTimeout == null ? DEFAULT_PUSH_TIMEOUT : pushTimeout; this.basePersistDirectory = basePersistDirectory; Preconditions.checkArgument( @@ -1057,6 +1027,26 @@ private IndexTuningConfig( this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; } + private static Integer initializeTargetPartitionSize(Integer numShards, Integer targetPartitionSize) + { + if (numShards == null || numShards == -1) { + return targetPartitionSize == null || targetPartitionSize.equals(-1) + ? DEFAULT_TARGET_PARTITION_SIZE + : targetPartitionSize; + } else { + return null; + } + } + + private static Long initializeMaxTotalRows(Integer numShards, Long maxTotalRows) + { + if (numShards == null || numShards == -1) { + return maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows; + } else { + return null; + } + } + public IndexTuningConfig withBasePersistDirectory(File dir) { return new IndexTuningConfig( @@ -1069,7 +1059,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir) forceExtendableShardSpecs, forceGuaranteedRollup, reportParseExceptions, - publishTimeout, + pushTimeout, dir, segmentWriteOutMediumFactory ); @@ -1089,7 +1079,7 @@ public int getMaxRowsInMemory() } @JsonProperty - public int getMaxTotalRows() + public Long getMaxTotalRows() { return maxTotalRows; } @@ -1150,9 +1140,9 @@ public boolean isReportParseExceptions() } @JsonProperty - public long getPublishTimeout() + public long getPushTimeout() { - return publishTimeout; + return pushTimeout; } @Override @@ -1180,12 +1170,12 @@ public boolean equals(Object o) } IndexTuningConfig that = (IndexTuningConfig) o; return maxRowsInMemory == that.maxRowsInMemory && - maxTotalRows == that.maxTotalRows && + Objects.equals(maxTotalRows, that.maxTotalRows) && maxPendingPersists == that.maxPendingPersists && forceExtendableShardSpecs == that.forceExtendableShardSpecs && forceGuaranteedRollup == that.forceGuaranteedRollup && reportParseExceptions == that.reportParseExceptions && - publishTimeout == that.publishTimeout && + pushTimeout == that.pushTimeout && Objects.equals(targetPartitionSize, that.targetPartitionSize) && Objects.equals(numShards, that.numShards) && Objects.equals(indexSpec, that.indexSpec) && @@ -1207,7 +1197,7 @@ public int hashCode() forceExtendableShardSpecs, forceGuaranteedRollup, reportParseExceptions, - publishTimeout, + pushTimeout, segmentWriteOutMediumFactory ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index 7bccdbab3d83..017158200806 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -227,7 +227,7 @@ private static IndexTuningConfig createTuningConfig() return new IndexTuningConfig( 5000000, 500000, - 1000000, + 1000000L, null, null, new IndexSpec( @@ -241,6 +241,7 @@ private static IndexTuningConfig createTuningConfig() false, true, false, + null, 100L, null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index ece5c55c7eb1..d0cf4e39d2cf 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -57,14 +57,14 @@ import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; -import io.druid.segment.transform.ExpressionTransform; -import io.druid.segment.transform.TransformSpec; import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; +import io.druid.segment.transform.ExpressionTransform; +import io.druid.segment.transform.TransformSpec; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; @@ -581,7 +581,7 @@ public void testWithSmallMaxTotalRows() throws Exception Granularities.MINUTE, null ), - createTuningConfig(2, 2, 2, null, false, false, true), + createTuningConfig(2, 2, 2L, null, false, false, true), false ), null @@ -623,7 +623,7 @@ public void testPerfectRollup() throws Exception true, null ), - createTuningConfig(3, 2, 2, null, false, true, true), + createTuningConfig(3, 2, 2L, null, false, true, true), false ), null @@ -664,7 +664,7 @@ public void testBestEffortRollup() throws Exception true, null ), - createTuningConfig(3, 2, 2, null, false, false, true), + createTuningConfig(3, 2, 2L, null, false, false, true), false ), null @@ -1128,7 +1128,7 @@ private static IndexTuningConfig createTuningConfig( private static IndexTuningConfig createTuningConfig( Integer targetPartitionSize, Integer maxRowsInMemory, - Integer maxTotalRows, + Long maxTotalRows, Integer numShards, boolean forceExtendableShardSpecs, boolean forceGuaranteedRollup, @@ -1148,6 +1148,7 @@ private static IndexTuningConfig createTuningConfig( forceGuaranteedRollup, reportParseException, null, + null, null ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index b0484c882239..dd94f9508de1 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -190,7 +190,7 @@ public void testIndexTaskSerde() throws Exception jsonMapper ), new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null, null) + new IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null, null, null) ), null ); @@ -253,7 +253,7 @@ public void testIndexTaskwithResourceSerde() throws Exception jsonMapper ), new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null) + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null, null) ), null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 50912834680b..e78046de7fc5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -671,7 +671,7 @@ public void testIndexTask() throws Exception mapper ), new IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null) + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null, null) ), null ); @@ -729,7 +729,7 @@ public void testIndexTaskFailure() throws Exception mapper ), new IndexIOConfig(new MockExceptionalFirehoseFactory(), false), - new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null) + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null, null) ), null ); @@ -1094,7 +1094,7 @@ public void testResumeTasks() throws Exception mapper ), new IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null, null) + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null, null, null) ), null ); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index 564ed4b9181d..dbd1ed831378 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -26,6 +26,7 @@ import io.druid.query.QuerySegmentWalker; import io.druid.segment.incremental.IndexSizeExceededException; +import javax.annotation.Nullable; import java.io.Closeable; import java.util.Collection; import java.util.List; @@ -35,9 +36,9 @@ * both of those. It can also push data to deep storage. But, it does not decide which segments data should go into. * It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself! *

    - * Any time you call one of the methods that adds, persists, or pushes data, you must provide a Committer, or a - * Supplier of one, that represents all data you have given to the Appenderator so far. The Committer will be used when - * that data has been persisted to disk. + * You can provide a {@link Committer} or a Supplier of one when you call one of the methods that adds, persists, or + * pushes data. The Committer should represent all data you have given to the Appenderator so far. This Committer will + * be used when that data has been persisted to disk. */ public interface Appenderator extends QuerySegmentWalker, Closeable { @@ -72,8 +73,8 @@ default AppenderatorAddResult add(SegmentIdentifier identifier, InputRow row, Su * Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used * asynchronously. *

    - * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the - * metadata committed by Committer in sync. + * If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll, + * and push methods should all be called from the same thread to keep the metadata committed by Committer in sync. * * @param identifier the segment into which this row should be added * @param row the row to add @@ -94,7 +95,7 @@ default AppenderatorAddResult add(SegmentIdentifier identifier, InputRow row, Su AppenderatorAddResult add( SegmentIdentifier identifier, InputRow row, - Supplier committerSupplier, + @Nullable Supplier committerSupplier, boolean allowIncrementalPersists ) throws IndexSizeExceededException, SegmentNotWritableException; @@ -152,8 +153,8 @@ AppenderatorAddResult add( * persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with * the data persisted to disk. *

    - * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the - * metadata committed by Committer in sync. + * If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll, + * and push methods should all be called from the same thread to keep the metadata committed by Committer in sync. * * @param identifiers segment identifiers to be persisted * @param committer a committer associated with all data that has been added to segments of the given identifiers so @@ -162,7 +163,7 @@ AppenderatorAddResult add( * @return future that resolves when all pending data to segments of the identifiers has been persisted, contains * commit metadata for this persist */ - ListenableFuture persist(Collection identifiers, Committer committer); + ListenableFuture persist(Collection identifiers, @Nullable Committer committer); /** * Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the @@ -170,14 +171,14 @@ AppenderatorAddResult add( * be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to * disk. *

    - * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the - * metadata committed by Committer in sync. + * If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll, + * and push methods should all be called from the same thread to keep the metadata committed by Committer in sync. * * @param committer a committer associated with all data that has been added so far * * @return future that resolves when all pending data has been persisted, contains commit metadata for this persist */ - default ListenableFuture persistAll(Committer committer) + default ListenableFuture persistAll(@Nullable Committer committer) { return persist(getSegments(), committer); } @@ -188,8 +189,8 @@ default ListenableFuture persistAll(Committer committer) *

    * After this method is called, you cannot add new data to any segments that were previously under construction. *

    - * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the - * metadata committed by Committer in sync. + * If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll, + * and push methods should all be called from the same thread to keep the metadata committed by Committer in sync. * * @param identifiers list of segments to push * @param committer a committer associated with all data that has been added so far @@ -197,7 +198,7 @@ default ListenableFuture persistAll(Committer committer) * @return future that resolves when all segments have been pushed. The segment list will be the list of segments * that have been pushed and the commit metadata from the Committer. */ - ListenableFuture push(Collection identifiers, Committer committer); + ListenableFuture push(Collection identifiers, @Nullable Committer committer); /** * Stop any currently-running processing and clean up after ourselves. This allows currently running persists and pushes diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java deleted file mode 100644 index 82dc9ef6759a..000000000000 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java +++ /dev/null @@ -1,776 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.realtime.appenderator; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; -import io.druid.data.input.Committer; -import io.druid.data.input.InputRow; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.concurrent.Execs; -import io.druid.java.util.common.logger.Logger; -import io.druid.query.SegmentDescriptor; -import io.druid.segment.realtime.FireDepartmentMetrics; -import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; -import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; -import org.joda.time.DateTime; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -/** - * A AppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you - * index unbounded streams. All handoff is done at the end of indexing. - *

    - * This class helps with doing things that Appenderators don't, including deciding which segments to use (with a - * SegmentAllocator), publishing segments to the metadata store (with a SegmentPublisher), and monitoring handoff (with - * a SegmentHandoffNotifier). - *

    - * Note that the commit metadata stored by this class via the underlying Appenderator is not the same metadata as - * you pass in. It's wrapped in some extra metadata needed by the driver. - */ -public class AppenderatorDriver implements Closeable -{ - private static final Logger log = new Logger(AppenderatorDriver.class); - - private final Appenderator appenderator; - private final SegmentAllocator segmentAllocator; - private final SegmentHandoffNotifier handoffNotifier; - private final UsedSegmentChecker usedSegmentChecker; - private final ObjectMapper objectMapper; - private final FireDepartmentMetrics metrics; - - enum SegmentState - { - ACTIVE, - INACTIVE, - PUBLISHING - } - - static class SegmentWithState - { - private SegmentIdentifier segmentIdentifier; - private SegmentState state; - - @JsonCreator - SegmentWithState( - @JsonProperty("segmentIdentifier") SegmentIdentifier segmentIdentifier, - @JsonProperty("state") SegmentState state - ) - { - this.segmentIdentifier = segmentIdentifier; - this.state = state; - } - - @JsonProperty - public SegmentIdentifier getSegmentIdentifier() - { - return segmentIdentifier; - } - - @JsonProperty - public SegmentState getState() - { - return state; - } - - @Override - public String toString() - { - return "SegmentWithState{" + - "segmentIdentifier=" + segmentIdentifier + - ", state=" + state + - '}'; - } - } - - // sequenceName -> {Interval Start millis -> List of Segments for this interval} - // there might be multiple segments for a start interval, for example one segment - // can be in ACTIVE state and others might be in PUBLISHING state - private final Map>> segments = new TreeMap<>(); - - private final Set publishingSequences = new HashSet<>(); - - // sequenceName -> most recently allocated segment - private final Map lastSegmentIds = Maps.newHashMap(); - - private final ListeningExecutorService publishExecutor; - - /** - * Create a driver. - * - * @param appenderator appenderator - * @param segmentAllocator segment allocator - * @param handoffNotifierFactory handoff notifier factory - * @param usedSegmentChecker used segment checker - * @param objectMapper object mapper, used for serde of commit metadata - * @param metrics Firedepartment metrics - */ - public AppenderatorDriver( - Appenderator appenderator, - SegmentAllocator segmentAllocator, - SegmentHandoffNotifierFactory handoffNotifierFactory, - UsedSegmentChecker usedSegmentChecker, - ObjectMapper objectMapper, - FireDepartmentMetrics metrics - ) - { - this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator"); - this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator"); - this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory") - .createSegmentHandoffNotifier(appenderator.getDataSource()); - this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); - this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); - this.metrics = Preconditions.checkNotNull(metrics, "metrics"); - this.publishExecutor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d")); - } - - @VisibleForTesting - Map>> getSegments() - { - return segments; - } - - /** - * Perform any initial setup and return currently persisted commit metadata. - *

    - * Note that this method returns the same metadata you've passed in with your Committers, even though this class - * stores extra metadata on disk. - * - * @return currently persisted commit metadata - */ - public Object startJob() - { - handoffNotifier.start(); - - final AppenderatorDriverMetadata metadata = objectMapper.convertValue( - appenderator.startJob(), - AppenderatorDriverMetadata.class - ); - - log.info("Restored metadata[%s].", metadata); - - if (metadata != null) { - synchronized (segments) { - for (Map.Entry> entry : metadata.getSegments().entrySet()) { - final String sequenceName = entry.getKey(); - final TreeMap> segmentMap = Maps.newTreeMap(); - - segments.put(sequenceName, segmentMap); - - for (SegmentWithState segmentWithState : entry.getValue()) { - segmentMap.computeIfAbsent( - segmentWithState.getSegmentIdentifier().getInterval().getStartMillis(), - k -> new LinkedList<>() - ); - LinkedList segmentList = segmentMap.get(segmentWithState.getSegmentIdentifier() - .getInterval() - .getStartMillis()); - // always keep the ACTIVE segment for an interval start millis in the front - if (segmentWithState.getState() == SegmentState.ACTIVE) { - segmentList.addFirst(segmentWithState); - } else { - segmentList.addLast(segmentWithState); - } - } - } - lastSegmentIds.putAll(metadata.getLastSegmentIds()); - } - - return metadata.getCallerMetadata(); - } else { - return null; - } - } - - private void addSegment(String sequenceName, SegmentIdentifier identifier) - { - synchronized (segments) { - segments.computeIfAbsent(sequenceName, k -> new TreeMap<>()) - .computeIfAbsent(identifier.getInterval().getStartMillis(), k -> new LinkedList<>()) - .addFirst(new SegmentWithState(identifier, SegmentState.ACTIVE)); - lastSegmentIds.put(sequenceName, identifier.getIdentifierAsString()); - } - } - - /** - * Clears out all our state and also calls {@link Appenderator#clear()} on the underlying Appenderator. - */ - public void clear() throws InterruptedException - { - synchronized (segments) { - segments.clear(); - } - appenderator.clear(); - } - - public AppenderatorDriverAddResult add( - final InputRow row, - final String sequenceName, - final Supplier committerSupplier - ) throws IOException - { - return add(row, sequenceName, committerSupplier, false, true); - } - - /** - * Add a row. Must not be called concurrently from multiple threads. - * - * @param row the row to add - * @param sequenceName sequenceName for this row's segment - * @param committerSupplier supplier of a committer associated with all data that has been added, including this row - * if {@param allowIncrementalPersists} is set to false then this will not be used - * @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence. - * Should be set to false if replica tasks would index events in same order - * @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period - * threshold is hit - * - * @return {@link AppenderatorDriverAddResult} - * - * @throws IOException if there is an I/O error while allocating or writing to a segment - */ - - public AppenderatorDriverAddResult add( - final InputRow row, - final String sequenceName, - final Supplier committerSupplier, - final boolean skipSegmentLineageCheck, - final boolean allowIncrementalPersists - ) throws IOException - { - Preconditions.checkNotNull(row, "row"); - Preconditions.checkNotNull(sequenceName, "sequenceName"); - Preconditions.checkNotNull(committerSupplier, "committerSupplier"); - - final SegmentIdentifier identifier = getSegment(row, sequenceName, skipSegmentLineageCheck); - - if (identifier != null) { - try { - final Appenderator.AppenderatorAddResult result = appenderator.add( - identifier, - row, - wrapCommitterSupplier(committerSupplier), - allowIncrementalPersists - ); - return AppenderatorDriverAddResult.ok( - identifier, - result.getNumRowsInSegment(), - appenderator.getTotalRowCount(), - result.isPersistRequired() - ); - } - catch (SegmentNotWritableException e) { - throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier); - } - } else { - return AppenderatorDriverAddResult.fail(); - } - } - - /** - * Persist all data indexed through this driver so far. Blocks until complete. - *

    - * Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}. - * - * @param committer committer representing all data that has been added so far - * - * @return commitMetadata persisted - */ - public Object persist(final Committer committer) throws InterruptedException - { - try { - log.info("Persisting data."); - final long start = System.currentTimeMillis(); - final Object commitMetadata = appenderator.persistAll(wrapCommitter(committer)).get(); - log.info("Persisted pending data in %,dms.", System.currentTimeMillis() - start); - return commitMetadata; - } - catch (InterruptedException e) { - throw e; - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - /** - * Persist all data indexed through this driver so far. Returns a future of persisted commitMetadata. - *

    - * Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}. - * - * @param committer committer representing all data that has been added so far - * - * @return future containing commitMetadata persisted - */ - public ListenableFuture persistAsync(final Committer committer) - throws InterruptedException, ExecutionException - { - log.info("Persisting data asynchronously"); - return appenderator.persistAll(wrapCommitter(committer)); - } - - /** - * Register the segments in the given {@link SegmentsAndMetadata} to be handed off and execute a background task which - * waits until the hand off completes. - * - * @param segmentsAndMetadata the result segments and metadata of - * {@link #publish(TransactionalSegmentPublisher, Committer, Collection)} - * - * @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task - * which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata - * of the caller of {@link AppenderatorDriverMetadata} - */ - public ListenableFuture registerHandoff(SegmentsAndMetadata segmentsAndMetadata) - { - if (segmentsAndMetadata == null) { - return Futures.immediateFuture(null); - - } else { - final List waitingSegmentIdList = segmentsAndMetadata.getSegments().stream() - .map(SegmentIdentifier::fromDataSegment) - .collect(Collectors.toList()); - - if (waitingSegmentIdList.isEmpty()) { - return Futures.immediateFuture( - new SegmentsAndMetadata( - segmentsAndMetadata.getSegments(), - ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) - .getCallerMetadata() - ) - ); - } - - log.info("Register handoff of segments: [%s]", waitingSegmentIdList); - - final SettableFuture resultFuture = SettableFuture.create(); - final AtomicInteger numRemainingHandoffSegments = new AtomicInteger(waitingSegmentIdList.size()); - - for (final SegmentIdentifier segmentIdentifier : waitingSegmentIdList) { - handoffNotifier.registerSegmentHandoffCallback( - new SegmentDescriptor( - segmentIdentifier.getInterval(), - segmentIdentifier.getVersion(), - segmentIdentifier.getShardSpec().getPartitionNum() - ), - MoreExecutors.sameThreadExecutor(), - () -> { - log.info("Segment[%s] successfully handed off, dropping.", segmentIdentifier); - metrics.incrementHandOffCount(); - - final ListenableFuture dropFuture = appenderator.drop(segmentIdentifier); - Futures.addCallback( - dropFuture, - new FutureCallback() - { - @Override - public void onSuccess(Object result) - { - if (numRemainingHandoffSegments.decrementAndGet() == 0) { - log.info("Successfully handed off [%d] segments.", segmentsAndMetadata.getSegments().size()); - resultFuture.set( - new SegmentsAndMetadata( - segmentsAndMetadata.getSegments(), - ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) - .getCallerMetadata() - ) - ); - } - } - - @Override - public void onFailure(Throwable e) - { - log.warn(e, "Failed to drop segment[%s]?!", segmentIdentifier); - numRemainingHandoffSegments.decrementAndGet(); - resultFuture.setException(e); - } - } - ); - } - ); - } - - return resultFuture; - } - } - - /** - * Closes this driver. Does not close the underlying Appenderator; you should do that yourself. - */ - @Override - public void close() - { - publishExecutor.shutdownNow(); - handoffNotifier.close(); - } - - private SegmentIdentifier getActiveSegment(final DateTime timestamp, final String sequenceName) - { - synchronized (segments) { - final NavigableMap> segmentsForSequence = segments.get(sequenceName); - - if (segmentsForSequence == null) { - return null; - } - - final Map.Entry> candidateEntry = segmentsForSequence.floorEntry(timestamp.getMillis()); - if (candidateEntry != null - && candidateEntry.getValue().getFirst().getSegmentIdentifier().getInterval().contains(timestamp) - && candidateEntry.getValue().getFirst().getState().equals(SegmentState.ACTIVE)) { - return candidateEntry.getValue().getFirst().getSegmentIdentifier(); - } else { - return null; - } - } - } - - /** - * Return a segment usable for "timestamp". May return null if no segment can be allocated. - * - * @param row input row - * @param sequenceName sequenceName for potential segment allocation - * @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence. - * Should be set to false if replica tasks would index events in same order - * - * @return identifier, or null - * - * @throws IOException if an exception occurs while allocating a segment - */ - private SegmentIdentifier getSegment( - final InputRow row, - final String sequenceName, - final boolean skipSegmentLineageCheck - ) throws IOException - { - synchronized (segments) { - final DateTime timestamp = row.getTimestamp(); - final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName); - if (existing != null) { - return existing; - } else { - // Allocate new segment. - final SegmentIdentifier newSegment = segmentAllocator.allocate( - row, - sequenceName, - lastSegmentIds.get(sequenceName), - // send lastSegmentId irrespective of skipSegmentLineageCheck so that - // unique constraint for sequence_name_prev_id_sha1 does not fail for - // allocatePendingSegment in IndexerSQLMetadataStorageCoordinator - skipSegmentLineageCheck - ); - - if (newSegment != null) { - for (SegmentIdentifier identifier : appenderator.getSegments()) { - if (identifier.equals(newSegment)) { - throw new ISE( - "WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", - newSegment, - identifier - ); - } - } - - log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName); - addSegment(sequenceName, newSegment); - } else { - // Well, we tried. - log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName); - } - - return newSegment; - } - } - } - - /** - * Move a set of identifiers out from "active", making way for newer segments. - */ - public void moveSegmentOut(final String sequenceName, final List identifiers) - { - synchronized (segments) { - final NavigableMap> activeSegmentsForSequence = segments.get(sequenceName); - if (activeSegmentsForSequence == null) { - throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName); - } - - for (final SegmentIdentifier identifier : identifiers) { - log.info("Moving segment[%s] out of active list.", identifier); - final long key = identifier.getInterval().getStartMillis(); - if (activeSegmentsForSequence.get(key) == null || activeSegmentsForSequence.get(key).stream().noneMatch( - segmentWithState -> { - if (segmentWithState.getSegmentIdentifier().equals(identifier)) { - segmentWithState.state = SegmentState.INACTIVE; - return true; - } else { - return false; - } - } - )) { - throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier); - } - } - } - } - - /** - * Publish all pending segments. - * - * @param publisher segment publisher - * @param committer committer - * - * @return a {@link ListenableFuture} for the publish task which removes published {@code sequenceNames} from - * {@code activeSegments} and {@code publishPendingSegments} - */ - public ListenableFuture publishAll( - final TransactionalSegmentPublisher publisher, - final Committer committer - ) - { - final List theSequences; - synchronized (segments) { - theSequences = ImmutableList.copyOf(segments.keySet()); - } - return publish(publisher, wrapCommitter(committer), theSequences); - } - - /** - * Execute a task in background to publish all segments corresponding to the given sequence names. The task - * internally pushes the segments to the deep storage first, and then publishes the metadata to the metadata storage. - * - * @param publisher segment publisher - * @param committer committer - * @param sequenceNames a collection of sequence names to be published - * - * @return a {@link ListenableFuture} for the submitted task which removes published {@code sequenceNames} from - * {@code activeSegments} and {@code publishPendingSegments} - */ - public ListenableFuture publish( - final TransactionalSegmentPublisher publisher, - final Committer committer, - final Collection sequenceNames - ) - { - final List theSegments = new ArrayList<>(); - synchronized (segments) { - sequenceNames.stream() - .filter(sequenceName -> !publishingSequences.contains(sequenceName)) - .forEach(sequenceName -> { - if (segments.containsKey(sequenceName)) { - publishingSequences.add(sequenceName); - segments.get(sequenceName) - .values() - .stream() - .flatMap(Collection::stream) - .forEach(segmentWithState -> { - segmentWithState.state = SegmentState.PUBLISHING; - theSegments.add(segmentWithState.getSegmentIdentifier()); - }); - } - }); - } - - final ListenableFuture publishFuture = publish( - publisher, - wrapCommitter(committer), - theSegments - ); - - Futures.addCallback( - publishFuture, - new FutureCallback() - { - @Override - public void onSuccess(SegmentsAndMetadata result) - { - if (result != null) { - publishingSequences.removeAll(sequenceNames); - sequenceNames.forEach(segments::remove); - } - } - - @Override - public void onFailure(Throwable t) - { - // Do nothing, caller should handle the exception - log.error("Error publishing sequences [%s]", sequenceNames); - } - } - ); - - return publishFuture; - } - - /** - * Execute a task in background to publish the given segments. The task blocks until complete. - * Retries forever on transient failures, but may exit early on permanent failures. - *

    - * Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}. - * - * @param publisher publisher to use for this set of segments - * @param wrappedCommitter committer representing all data that has been added so far - * - * @return segments and metadata published if successful, or null if segments could not be handed off due to - * transaction failure with commit metadata. - */ - private ListenableFuture publish( - final TransactionalSegmentPublisher publisher, - final WrappedCommitter wrappedCommitter, - final List segmentIdentifiers - ) - { - log.info("Pushing segments: [%s]", Joiner.on(", ").join(segmentIdentifiers)); - - return Futures.transform( - appenderator.push(segmentIdentifiers, wrappedCommitter), - (Function) segmentsAndMetadata -> { - // Sanity check - final Set pushedSegments = segmentsAndMetadata.getSegments().stream() - .map(SegmentIdentifier::fromDataSegment) - .collect(Collectors.toSet()); - if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) { - throw new ISE( - "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", - pushedSegments, - segmentIdentifiers - ); - } - - if (segmentsAndMetadata.getSegments().isEmpty()) { - log.info("Nothing to publish, skipping publish step."); - } else { - log.info( - "Publishing segments with commitMetadata[%s]: [%s]", - segmentsAndMetadata.getCommitMetadata(), - Joiner.on(", ").join(segmentsAndMetadata.getSegments()) - ); - - try { - final boolean published = publisher.publishSegments( - ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), - ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() - ); - - if (published) { - log.info("Published segments."); - } else { - log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); - if (usedSegmentChecker.findUsedSegments(pushedSegments) - .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { - log.info("Our segments really do exist, awaiting handoff."); - } else { - throw new ISE("Failed to publish segments[%s]", segmentIdentifiers); - } - } - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - - return segmentsAndMetadata; - }, - publishExecutor - ); - } - - public ListenableFuture publishAndRegisterHandoff( - final TransactionalSegmentPublisher publisher, - final Committer committer, - final Collection sequenceNames - ) - { - return Futures.transform( - publish(publisher, committer, sequenceNames), - this::registerHandoff - ); - } - - private interface WrappedCommitter extends Committer - { - } - - private Supplier wrapCommitterSupplier(final Supplier committerSupplier) - { - return () -> wrapCommitter(committerSupplier.get()); - } - - private WrappedCommitter wrapCommitter(final Committer committer) - { - final AppenderatorDriverMetadata wrappedMetadata; - synchronized (segments) { - wrappedMetadata = new AppenderatorDriverMetadata( - ImmutableMap.copyOf( - Maps.transformValues( - segments, - (Function>, List>) input -> ImmutableList - .copyOf(input.values().stream().flatMap(Collection::stream).collect(Collectors.toList())) - ) - ), - ImmutableMap.copyOf(lastSegmentIds), - committer.getMetadata() - ); - } - - return new WrappedCommitter() - { - @Override - public Object getMetadata() - { - return wrappedMetadata; - } - - @Override - public void run() - { - committer.run(); - } - }; - } -} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java index 4838d58a227f..c5afa3a78205 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java @@ -25,7 +25,7 @@ import javax.annotation.Nullable; /** - * Result of {@link AppenderatorDriver#add(InputRow, String, Supplier, boolean)}. It contains the identifier of the + * Result of {@link BaseAppenderatorDriver#add(InputRow, String, Supplier, boolean)}. It contains the identifier of the * segment which the InputRow is added to, the number of rows in that segment and if persist is required because either * maxRowsInMemory or intermediate persist period threshold is hit. */ diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java index 0e8722c7d087..d1e002a66dab 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import java.util.ArrayList; import java.util.List; @@ -33,13 +34,13 @@ public class AppenderatorDriverMetadata { - private final Map> segments; + private final Map> segments; private final Map lastSegmentIds; private final Object callerMetadata; @JsonCreator public AppenderatorDriverMetadata( - @JsonProperty("segments") Map> segments, + @JsonProperty("segments") Map> segments, @JsonProperty("lastSegmentIds") Map lastSegmentIds, @JsonProperty("callerMetadata") Object callerMetadata, // Next two properties are for backwards compatibility, should be removed on versions greater than 0.12.x @@ -57,7 +58,7 @@ public AppenderatorDriverMetadata( ); if (segments == null) { // convert old metadata to new one - final Map> newMetadata = Maps.newHashMap(); + final Map> newMetadata = Maps.newHashMap(); final Set activeSegmentsAlreadySeen = Sets.newHashSet(); // temp data structure activeSegments.entrySet() @@ -67,10 +68,7 @@ public AppenderatorDriverMetadata( .stream() .map(segmentIdentifier -> { activeSegmentsAlreadySeen.add(segmentIdentifier.toString()); - return new AppenderatorDriver.SegmentWithState( - segmentIdentifier, - AppenderatorDriver.SegmentState.ACTIVE - ); + return SegmentWithState.newSegment(segmentIdentifier); }) .collect(Collectors.toList()) )); @@ -84,9 +82,9 @@ public AppenderatorDriverMetadata( .stream() .filter(segmentIdentifier -> !activeSegmentsAlreadySeen.contains( segmentIdentifier.toString())) - .map(segmentIdentifier -> new AppenderatorDriver.SegmentWithState( + .map(segmentIdentifier -> SegmentWithState.newSegment( segmentIdentifier, - AppenderatorDriver.SegmentState.INACTIVE + SegmentState.APPEND_FINISHED )) .collect(Collectors.toList()) )); @@ -99,7 +97,7 @@ public AppenderatorDriverMetadata( } public AppenderatorDriverMetadata( - Map> segments, + Map> segments, Map lastSegmentIds, Object callerMetadata ) @@ -108,7 +106,7 @@ public AppenderatorDriverMetadata( } @JsonProperty - public Map> getSegments() + public Map> getSegments() { return segments; } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index c8fd9c52d41d..94eb591e43f0 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -202,7 +202,7 @@ public Object startJob() public AppenderatorAddResult add( final SegmentIdentifier identifier, final InputRow row, - final Supplier committerSupplier, + @Nullable final Supplier committerSupplier, final boolean allowIncrementalPersists ) throws IndexSizeExceededException, SegmentNotWritableException { @@ -244,7 +244,7 @@ public AppenderatorAddResult add( || rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { if (allowIncrementalPersists) { // persistAll clears rowsCurrentlyInMemory, no need to update it. - persistAll(committerSupplier.get()); + persistAll(committerSupplier == null ? null : committerSupplier.get()); } else { isPersistRequired = true; } @@ -340,35 +340,37 @@ public void clear() throws InterruptedException // Drop commit metadata, then abandon all segments. try { - final ListenableFuture uncommitFuture = persistExecutor.submit( - new Callable() - { - @Override - public Object call() throws Exception + if (persistExecutor != null) { + final ListenableFuture uncommitFuture = persistExecutor.submit( + new Callable() { - try { - commitLock.lock(); - objectMapper.writeValue(computeCommitFile(), Committed.nil()); - } - finally { - commitLock.unlock(); + @Override + public Object call() throws Exception + { + try { + commitLock.lock(); + objectMapper.writeValue(computeCommitFile(), Committed.nil()); + } + finally { + commitLock.unlock(); + } + return null; } - return null; } - } - ); + ); - // Await uncommit. - uncommitFuture.get(); + // Await uncommit. + uncommitFuture.get(); - // Drop everything. - final List> futures = Lists.newArrayList(); - for (Map.Entry entry : sinks.entrySet()) { - futures.add(abandonSegment(entry.getKey(), entry.getValue(), true)); - } + // Drop everything. + final List> futures = Lists.newArrayList(); + for (Map.Entry entry : sinks.entrySet()) { + futures.add(abandonSegment(entry.getKey(), entry.getValue(), true)); + } - // Await dropping. - Futures.allAsList(futures).get(); + // Await dropping. + Futures.allAsList(futures).get(); + } } catch (ExecutionException e) { throw Throwables.propagate(e); @@ -387,7 +389,7 @@ public ListenableFuture drop(final SegmentIdentifier identifier) } @Override - public ListenableFuture persist(Collection identifiers, Committer committer) + public ListenableFuture persist(Collection identifiers, @Nullable Committer committer) { final Map currentHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); @@ -418,7 +420,7 @@ public ListenableFuture persist(Collection identifier log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); final String threadName = StringUtils.format("%s-incremental-persist", schema.getDataSource()); - final Object commitMetadata = committer.getMetadata(); + final Object commitMetadata = committer == null ? null : committer.getMetadata(); final Stopwatch runExecStopwatch = Stopwatch.createStarted(); final Stopwatch persistStopwatch = Stopwatch.createStarted(); final ListenableFuture future = persistExecutor.submit( @@ -432,37 +434,39 @@ public Object doCall() metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs)); } - log.info( - "Committing metadata[%s] for sinks[%s].", commitMetadata, Joiner.on(", ").join( - currentHydrants.entrySet() - .stream() - .map(entry -> StringUtils.format( - "%s:%d", - entry.getKey(), - entry.getValue() - )) - .collect(Collectors.toList()) - ) - ); - - committer.run(); - - try { - commitLock.lock(); - final File commitFile = computeCommitFile(); - final Map commitHydrants = Maps.newHashMap(); - if (commitFile.exists()) { - // merge current hydrants with existing hydrants - final Committed oldCommitted = objectMapper.readValue(commitFile, Committed.class); - commitHydrants.putAll(oldCommitted.getHydrants()); + if (committer != null) { + log.info( + "Committing metadata[%s] for sinks[%s].", commitMetadata, Joiner.on(", ").join( + currentHydrants.entrySet() + .stream() + .map(entry -> StringUtils.format( + "%s:%d", + entry.getKey(), + entry.getValue() + )) + .collect(Collectors.toList()) + ) + ); + + committer.run(); + + try { + commitLock.lock(); + final Map commitHydrants = Maps.newHashMap(); + final Committed oldCommit = readCommit(); + if (oldCommit != null) { + // merge current hydrants with existing hydrants + commitHydrants.putAll(oldCommit.getHydrants()); + } + commitHydrants.putAll(currentHydrants); + writeCommit(new Committed(commitHydrants, commitMetadata)); + } + finally { + commitLock.unlock(); } - commitHydrants.putAll(currentHydrants); - objectMapper.writeValue(commitFile, new Committed(commitHydrants, commitMetadata)); - } - finally { - commitLock.unlock(); } + // return null if committer is null return commitMetadata; } catch (Exception e) { @@ -493,7 +497,7 @@ public Object doCall() } @Override - public ListenableFuture persistAll(final Committer committer) + public ListenableFuture persistAll(@Nullable final Committer committer) { // Submit persistAll task to the persistExecutor return persist(sinks.keySet(), committer); @@ -502,7 +506,7 @@ public ListenableFuture persistAll(final Committer committer) @Override public ListenableFuture push( final Collection identifiers, - final Committer committer + @Nullable final Committer committer ) { final Map theSinks = Maps.newHashMap(); @@ -707,6 +711,9 @@ public void close() intermediateTempExecutor == null || intermediateTempExecutor.awaitTermination(365, TimeUnit.DAYS), "intermediateTempExecutor not terminated" ); + persistExecutor = null; + pushExecutor = null; + intermediateTempExecutor = null; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -746,6 +753,7 @@ public void closeNow() } try { shutdownExecutors(); + // We don't wait for pushExecutor to be terminated. See Javadoc for more details. Preconditions.checkState( persistExecutor == null || persistExecutor.awaitTermination(365, TimeUnit.DAYS), "persistExecutor not terminated" @@ -754,6 +762,8 @@ public void closeNow() intermediateTempExecutor == null || intermediateTempExecutor.awaitTermination(365, TimeUnit.DAYS), "intermediateTempExecutor not terminated" ); + persistExecutor = null; + intermediateTempExecutor = null; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -1042,10 +1052,9 @@ public Object apply(@Nullable Object input) log.info("Removing commit metadata for segment[%s].", identifier); try { commitLock.lock(); - final File commitFile = computeCommitFile(); - if (commitFile.exists()) { - final Committed oldCommitted = objectMapper.readValue(commitFile, Committed.class); - objectMapper.writeValue(commitFile, oldCommitted.without(identifier.getIdentifierAsString())); + final Committed oldCommit = readCommit(); + if (oldCommit != null) { + writeCommit(oldCommit.without(identifier.getIdentifierAsString())); } } catch (Exception e) { @@ -1098,6 +1107,23 @@ public Object apply(@Nullable Object input) ); } + private Committed readCommit() throws IOException + { + final File commitFile = computeCommitFile(); + if (commitFile.exists()) { + // merge current hydrants with existing hydrants + return objectMapper.readValue(commitFile, Committed.class); + } else { + return null; + } + } + + private void writeCommit(Committed newCommit) throws IOException + { + final File commitFile = computeCommitFile(); + objectMapper.writeValue(commitFile, newCommit); + } + private File computeCommitFile() { return new File(tuningConfig.getBasePersistDirectory(), "commit.json"); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java new file mode 100644 index 000000000000..24482d19c96c --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -0,0 +1,541 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.data.input.Committer; +import io.druid.data.input.InputRow; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A BaseAppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you + * index unbounded streams. All handoff is done at the end of indexing. + *

    + * This class helps with doing things that Appenderators don't, including deciding which segments to use (with a + * SegmentAllocator), publishing segments to the metadata store (with a SegmentPublisher). + *

    + * This class has two child classes, i.e., {@link BatchAppenderatorDriver} and {@link StreamAppenderatorDriver}, + * which are for batch and streaming ingestion, respectively. This class provides some fundamental methods for making + * the child classes' life easier like {@link #pushInBackground}, {@link #dropInBackground}, or + * {@link #publishInBackground}. The child classes can use these methods to achieve their goal. + *

    + * Note that the commit metadata stored by this class via the underlying Appenderator is not the same metadata as + * you pass in. It's wrapped in some extra metadata needed by the driver. + */ +public abstract class BaseAppenderatorDriver implements Closeable +{ + /** + * Allocated segments for a sequence + */ + static class SegmentsForSequence + { + // Interval Start millis -> List of Segments for this interval + // there might be multiple segments for a start interval, for example one segment + // can be in APPENDING state and others might be in PUBLISHING state + private final NavigableMap> intervalToSegmentStates; + + // most recently allocated segment + private String lastSegmentId; + + SegmentsForSequence() + { + this.intervalToSegmentStates = new TreeMap<>(); + } + + SegmentsForSequence( + NavigableMap> intervalToSegmentStates, + String lastSegmentId + ) + { + this.intervalToSegmentStates = intervalToSegmentStates; + this.lastSegmentId = lastSegmentId; + } + + void add(SegmentIdentifier identifier) + { + intervalToSegmentStates.computeIfAbsent(identifier.getInterval().getStartMillis(), k -> new LinkedList<>()) + .addFirst(SegmentWithState.newSegment(identifier)); + lastSegmentId = identifier.getIdentifierAsString(); + } + + Entry> floor(long timestamp) + { + return intervalToSegmentStates.floorEntry(timestamp); + } + + LinkedList get(long timestamp) + { + return intervalToSegmentStates.get(timestamp); + } + + Stream segmentStateStream() + { + return intervalToSegmentStates.values().stream().flatMap(Collection::stream); + } + } + + private static final Logger log = new Logger(BaseAppenderatorDriver.class); + + private final SegmentAllocator segmentAllocator; + private final UsedSegmentChecker usedSegmentChecker; + + protected final Appenderator appenderator; + // sequenceName -> segmentsForSequence + // This map should be locked with itself before accessing it. + // Note: BatchAppenderatorDriver currently doesn't need to lock this map because it doesn't do anything concurrently. + // However, it's desired to do some operations like indexing and pushing at the same time. Locking this map is also + // required in BatchAppenderatorDriver once this feature is supported. + protected final Map segments = new TreeMap<>(); + protected final ListeningExecutorService executor; + + BaseAppenderatorDriver( + Appenderator appenderator, + SegmentAllocator segmentAllocator, + UsedSegmentChecker usedSegmentChecker + ) + { + this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator"); + this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator"); + this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); + this.executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d")); + } + + @VisibleForTesting + Map getSegments() + { + return segments; + } + + /** + * Perform any initial setup and return currently persisted commit metadata. + *

    + * Note that this method returns the same metadata you've passed in with your Committers, even though this class + * stores extra metadata on disk. + * + * @return currently persisted commit metadata + */ + @Nullable + public abstract Object startJob(); + + /** + * Find a segment in the {@link SegmentState#APPENDING} state for the given timestamp and sequenceName. + */ + private SegmentIdentifier getAppendableSegment(final DateTime timestamp, final String sequenceName) + { + synchronized (segments) { + final SegmentsForSequence segmentsForSequence = segments.get(sequenceName); + + if (segmentsForSequence == null) { + return null; + } + + final Map.Entry> candidateEntry = segmentsForSequence.floor( + timestamp.getMillis() + ); + if (candidateEntry != null + && candidateEntry.getValue().getFirst().getSegmentIdentifier().getInterval().contains(timestamp) + && candidateEntry.getValue().getFirst().getState() == SegmentState.APPENDING) { + return candidateEntry.getValue().getFirst().getSegmentIdentifier(); + } else { + return null; + } + } + } + + /** + * Return a segment usable for "timestamp". May return null if no segment can be allocated. + * + * @param row input row + * @param sequenceName sequenceName for potential segment allocation + * @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence. + * Should be set to false if replica tasks would index events in same order + * + * @return identifier, or null + * + * @throws IOException if an exception occurs while allocating a segment + */ + private SegmentIdentifier getSegment( + final InputRow row, + final String sequenceName, + final boolean skipSegmentLineageCheck + ) throws IOException + { + synchronized (segments) { + final DateTime timestamp = row.getTimestamp(); + final SegmentIdentifier existing = getAppendableSegment(timestamp, sequenceName); + if (existing != null) { + return existing; + } else { + // Allocate new segment. + final SegmentsForSequence segmentsForSequence = segments.get(sequenceName); + final SegmentIdentifier newSegment = segmentAllocator.allocate( + row, + sequenceName, + segmentsForSequence == null ? null : segmentsForSequence.lastSegmentId, + // send lastSegmentId irrespective of skipSegmentLineageCheck so that + // unique constraint for sequence_name_prev_id_sha1 does not fail for + // allocatePendingSegment in IndexerSQLMetadataStorageCoordinator + skipSegmentLineageCheck + ); + + if (newSegment != null) { + for (SegmentIdentifier identifier : appenderator.getSegments()) { + if (identifier.equals(newSegment)) { + throw new ISE( + "WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", + newSegment, + identifier + ); + } + } + + log.info("New segment[%s] for row[%s] sequenceName[%s].", newSegment, row, sequenceName); + addSegment(sequenceName, newSegment); + } else { + // Well, we tried. + log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName); + } + + return newSegment; + } + } + } + + private void addSegment(String sequenceName, SegmentIdentifier identifier) + { + synchronized (segments) { + segments.computeIfAbsent(sequenceName, k -> new SegmentsForSequence()) + .add(identifier); + } + } + + /** + * Add a row. Must not be called concurrently from multiple threads. + * + * @param row the row to add + * @param sequenceName sequenceName for this row's segment + * @param committerSupplier supplier of a committer associated with all data that has been added, including this row + * if {@param allowIncrementalPersists} is set to false then this will not be used + * @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence. + * Should be set to false if replica tasks would index events in same order + * @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period + * threshold is hit + * + * @return {@link AppenderatorDriverAddResult} + * + * @throws IOException if there is an I/O error while allocating or writing to a segment + */ + protected AppenderatorDriverAddResult append( + final InputRow row, + final String sequenceName, + @Nullable final Supplier committerSupplier, + final boolean skipSegmentLineageCheck, + final boolean allowIncrementalPersists + ) throws IOException + { + Preconditions.checkNotNull(row, "row"); + Preconditions.checkNotNull(sequenceName, "sequenceName"); + + final SegmentIdentifier identifier = getSegment(row, sequenceName, skipSegmentLineageCheck); + + if (identifier != null) { + try { + final Appenderator.AppenderatorAddResult result = appenderator.add( + identifier, + row, + committerSupplier == null ? null : wrapCommitterSupplier(committerSupplier), + allowIncrementalPersists + ); + return AppenderatorDriverAddResult.ok( + identifier, + result.getNumRowsInSegment(), + appenderator.getTotalRowCount(), + result.isPersistRequired() + ); + } + catch (SegmentNotWritableException e) { + throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier); + } + } else { + return AppenderatorDriverAddResult.fail(); + } + } + + /** + * Returns a stream of {@link SegmentWithState} for the given sequenceNames. + */ + Stream getSegmentWithStates(Collection sequenceNames) + { + synchronized (segments) { + return sequenceNames + .stream() + .map(segments::get) + .filter(Objects::nonNull) + .flatMap(segmentsForSequence -> segmentsForSequence.intervalToSegmentStates.values().stream()) + .flatMap(Collection::stream); + } + } + + /** + * Push the given segments in background. + * + * @param wrappedCommitter should not be null if you want to persist intermediate states + * @param segmentIdentifiers identifiers of the segments to be pushed + * + * @return a future for pushing segments + */ + ListenableFuture pushInBackground( + @Nullable final WrappedCommitter wrappedCommitter, + final Collection segmentIdentifiers + ) + { + log.info("Pushing segments in background: [%s]", Joiner.on(", ").join(segmentIdentifiers)); + + return Futures.transform( + appenderator.push(segmentIdentifiers, wrappedCommitter), + (Function) segmentsAndMetadata -> { + // Sanity check + final Set pushedSegments = segmentsAndMetadata.getSegments().stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toSet()); + if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) { + throw new ISE( + "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", + pushedSegments, + segmentIdentifiers + ); + } + + return segmentsAndMetadata; + }, + executor + ); + } + + /** + * Drop segments in background. The segments should be pushed (in batch ingestion) or published (in streaming + * ingestion) before being dropped. + * + * @param segmentsAndMetadata result of pushing or publishing + * + * @return a future for dropping segments + */ + ListenableFuture dropInBackground(SegmentsAndMetadata segmentsAndMetadata) + { + log.info("Dropping segments[%s]", segmentsAndMetadata.getSegments()); + final ListenableFuture dropFuture = Futures.allAsList( + segmentsAndMetadata + .getSegments() + .stream() + .map(segment -> appenderator.drop(SegmentIdentifier.fromDataSegment(segment))) + .collect(Collectors.toList()) + ); + + return Futures.transform( + dropFuture, + (Function) x -> { + final Object metadata = segmentsAndMetadata.getCommitMetadata(); + return new SegmentsAndMetadata( + segmentsAndMetadata.getSegments(), + metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() + ); + } + ); + } + + /** + * Publish segments in background. The segments should be dropped (in batch ingestion) or pushed (in streaming + * ingestion) before being published. + * + * @param segmentsAndMetadata result of dropping or pushing + * @param publisher transactional segment publisher + * + * @return a future for publishing segments + */ + ListenableFuture publishInBackground( + SegmentsAndMetadata segmentsAndMetadata, + TransactionalSegmentPublisher publisher + ) + { + return executor.submit( + () -> { + if (segmentsAndMetadata.getSegments().isEmpty()) { + log.info("Nothing to publish, skipping publish step."); + } else { + log.info( + "Publishing segments with commitMetadata[%s]: [%s]", + segmentsAndMetadata.getCommitMetadata(), + Joiner.on(", ").join(segmentsAndMetadata.getSegments()) + ); + + try { + final Object metadata = segmentsAndMetadata.getCommitMetadata(); + final boolean published = publisher.publishSegments( + ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), + metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() + ); + + if (published) { + log.info("Published segments."); + } else { + log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); + final Set segmentsIdentifiers = segmentsAndMetadata + .getSegments() + .stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toSet()); + if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers) + .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { + log.info("Our segments really do exist, awaiting handoff."); + } else { + throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments()); + } + } + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + return segmentsAndMetadata; + } + ); + } + + /** + * Clears out all our state and also calls {@link Appenderator#clear()} on the underlying Appenderator. + */ + public void clear() throws InterruptedException + { + synchronized (segments) { + segments.clear(); + } + appenderator.clear(); + } + + /** + * Closes this driver. Does not close the underlying Appenderator; you should do that yourself. + */ + @Override + public void close() + { + executor.shutdownNow(); + } + + /** + * Wrapped committer for BaseAppenderatorDriver. Used in only {@link StreamAppenderatorDriver} because batch ingestion + * doesn't need committing intermediate states. + */ + static class WrappedCommitter implements Committer + { + private final Committer delegate; + private final AppenderatorDriverMetadata metadata; + + WrappedCommitter(Committer delegate, AppenderatorDriverMetadata metadata) + { + this.delegate = delegate; + this.metadata = metadata; + } + + @Override + public Object getMetadata() + { + return metadata; + } + + @Override + public void run() + { + delegate.run(); + } + } + + WrappedCommitter wrapCommitter(final Committer committer) + { + final AppenderatorDriverMetadata wrappedMetadata; + final Map snapshot; + synchronized (segments) { + snapshot = ImmutableMap.copyOf(segments); + } + + wrappedMetadata = new AppenderatorDriverMetadata( + ImmutableMap.copyOf( + Maps.transformValues( + snapshot, + (Function>) input -> ImmutableList.copyOf( + input.intervalToSegmentStates.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()) + ) + ) + ), + snapshot.entrySet() + .stream() + .collect( + Collectors.toMap( + Entry::getKey, + e -> e.getValue().lastSegmentId + ) + ), + committer.getMetadata() + ); + + return new WrappedCommitter(committer, wrappedMetadata); + } + + private Supplier wrapCommitterSupplier(final Supplier committerSupplier) + { + return () -> wrapCommitter(committerSupplier.get()); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java new file mode 100644 index 000000000000..c67b31054bf1 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -0,0 +1,209 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.data.input.InputRow; +import io.druid.java.util.common.ISE; +import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * This class is specifialized for batch ingestion. In batch ingestion, the segment lifecycle is like: + * + *

    + * APPENDING -> PUSHED_AND_DROPPED -> PUBLISHED
    + * 
    + * + *
      + *
    • APPENDING: Segment is available for appending.
    • + *
    • PUSHED_AND_DROPPED: Segment is pushed to deep storage and dropped from the local storage.
    • + *
    • PUBLISHED: Segment's metadata is published to metastore.
    • + *
    + */ +public class BatchAppenderatorDriver extends BaseAppenderatorDriver +{ + /** + * Create a driver. + * + * @param appenderator appenderator + * @param segmentAllocator segment allocator + * @param usedSegmentChecker used segment checker + */ + public BatchAppenderatorDriver( + Appenderator appenderator, + SegmentAllocator segmentAllocator, + UsedSegmentChecker usedSegmentChecker + ) + { + super(appenderator, segmentAllocator, usedSegmentChecker); + } + + /** + * This method always returns null because batch ingestion doesn't support restoring tasks on failures. + + * @return always null + */ + @Override + @Nullable + public Object startJob() + { + final Object metadata = appenderator.startJob(); + if (metadata != null) { + throw new ISE("Metadata should be null because BatchAppenderatorDriver never persists it"); + } + return null; + } + + /** + * Add a row. Must not be called concurrently from multiple threads. + * + * @param row the row to add + * @param sequenceName sequenceName for this row's segment + * + * @return {@link AppenderatorDriverAddResult} + * + * @throws IOException if there is an I/O error while allocating or writing to a segment + */ + public AppenderatorDriverAddResult add( + InputRow row, + String sequenceName + ) throws IOException + { + return append(row, sequenceName, null, false, true); + } + + /** + * Push and drop all segments in the {@link SegmentState#APPENDING} state. + * + * @param pushAndClearTimeoutMs timeout for pushing and dropping segments + * + * @return {@link SegmentsAndMetadata} for pushed and dropped segments + */ + public SegmentsAndMetadata pushAllAndClear(long pushAndClearTimeoutMs) + throws InterruptedException, ExecutionException, TimeoutException + { + final Collection sequences; + synchronized (segments) { + sequences = ImmutableList.copyOf(segments.keySet()); + } + + return pushAndClear(sequences, pushAndClearTimeoutMs); + } + + private SegmentsAndMetadata pushAndClear( + Collection sequenceNames, + long pushAndClearTimeoutMs + ) throws InterruptedException, ExecutionException, TimeoutException + { + final List segmentIdentifierList = getSegmentWithStates(sequenceNames) + .filter(segmentWithState -> segmentWithState.getState() == SegmentState.APPENDING) + .map(SegmentWithState::getSegmentIdentifier) + .collect(Collectors.toList()); + + final ListenableFuture future = Futures.transform( + pushInBackground(null, segmentIdentifierList), + this::dropInBackground + ); + + final SegmentsAndMetadata segmentsAndMetadata = pushAndClearTimeoutMs == 0L ? + future.get() : + future.get(pushAndClearTimeoutMs, TimeUnit.MILLISECONDS); + + // Sanity check + final Map pushedSegmentIdToSegmentMap = segmentsAndMetadata + .getSegments() + .stream() + .collect(Collectors.toMap( + SegmentIdentifier::fromDataSegment, + dataSegment -> dataSegment + )); + + final Map requestedSegmentIdsForSequences = getSegmentWithStates(sequenceNames) + .filter(segmentWithState -> segmentWithState.getState() == SegmentState.APPENDING) + .collect(Collectors.toMap( + SegmentWithState::getSegmentIdentifier, + segmentWithState -> segmentWithState + )); + + if (!pushedSegmentIdToSegmentMap.keySet().equals(requestedSegmentIdsForSequences.keySet())) { + throw new ISE( + "Pushed segments[%s] are different from the requested ones[%s]", + pushedSegmentIdToSegmentMap.keySet(), + requestedSegmentIdsForSequences.keySet() + ); + } + + // State transition + requestedSegmentIdsForSequences.forEach( + (segmentId, segmentWithState) -> { + segmentWithState.pushAndDrop(pushedSegmentIdToSegmentMap.get(segmentId)); + } + ); + + return segmentsAndMetadata; + } + + /** + * Publish all segments. + * + * @param publisher segment publisher + * + * @return a {@link ListenableFuture} for the publish task + */ + public ListenableFuture publishAll(final TransactionalSegmentPublisher publisher) + { + final Map snapshot; + synchronized (segments) { + snapshot = ImmutableMap.copyOf(segments); + } + + return publishInBackground( + new SegmentsAndMetadata( + snapshot + .values() + .stream() + .flatMap(SegmentsForSequence::segmentStateStream) + .map(segmentWithState -> Preconditions.checkNotNull( + segmentWithState.getDataSegment(), + "dataSegment for segmentId[%s]", + segmentWithState.getSegmentIdentifier()) + ) + .collect(Collectors.toList()), + null + ), + publisher + ); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentWithState.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentWithState.java new file mode 100644 index 000000000000..3a89c1593cf9 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentWithState.java @@ -0,0 +1,153 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.List; + +public class SegmentWithState +{ + /** + * Segment state transition is different in {@link BatchAppenderatorDriver} and {@link StreamAppenderatorDriver}. + * When a new segment is created, its state is {@link #APPENDING}. + * + * - In stream ingestion, the state of some segments can be changed to the {@link #APPEND_FINISHED} state. Data is + * not appended to these segments anymore, and they are waiting for beging published. + * See {@link StreamAppenderatorDriver#moveSegmentOut(String, List)}. + * - In batch ingestion, the state of some segments can be changed to the {@link #PUSHED_AND_DROPPED} state. These + * segments are pushed and dropped from the local storage, but not published yet. + * See {@link BatchAppenderatorDriver#pushAndClear(Collection, long)}. + * + * Note: If you need to add more states which are used differently in batch and streaming ingestion, consider moving + * SegmentState to {@link BatchAppenderatorDriver} and {@link StreamAppenderatorDriver}. + */ + public enum SegmentState + { + APPENDING, + APPEND_FINISHED, // only used in StreamAppenderatorDriver + PUSHED_AND_DROPPED; // only used in BatchAppenderatorDriver + + @JsonCreator + public static SegmentState fromString(@JsonProperty String name) + { + if (name.equalsIgnoreCase("ACTIVE")) { + return APPENDING; + } else if (name.equalsIgnoreCase("INACTIVE")) { + return APPEND_FINISHED; + } else { + return SegmentState.valueOf(name); + } + } + } + + private final SegmentIdentifier segmentIdentifier; + private SegmentState state; + + /** + * This is to keep what dataSegment object was created for {@link #segmentIdentifier} when + * {@link BaseAppenderatorDriver#pushInBackground} is called. + */ + @Nullable private DataSegment dataSegment; + + static SegmentWithState newSegment(SegmentIdentifier segmentIdentifier) + { + return new SegmentWithState(segmentIdentifier, SegmentState.APPENDING, null); + } + + static SegmentWithState newSegment(SegmentIdentifier segmentIdentifier, SegmentState state) + { + return new SegmentWithState(segmentIdentifier, state, null); + } + + @JsonCreator + public SegmentWithState( + @JsonProperty("segmentIdentifier") SegmentIdentifier segmentIdentifier, + @JsonProperty("state") SegmentState state, + @JsonProperty("dataSegment") @Nullable DataSegment dataSegment) + { + this.segmentIdentifier = segmentIdentifier; + this.state = state; + this.dataSegment = dataSegment; + } + + public void setState(SegmentState state) + { + this.state = state; + } + + /** + * Change the segment state to {@link SegmentState#APPEND_FINISHED}. The current state should be + * {@link SegmentState#APPENDING}. + */ + public void finishAppending() + { + checkStateTransition(this.state, SegmentState.APPENDING, SegmentState.APPEND_FINISHED); + this.state = SegmentState.APPEND_FINISHED; + } + + /** + * Change the segment state to {@link SegmentState#PUSHED_AND_DROPPED}. The current state should be + * {@link SegmentState#APPENDING}. This method should be called after the segment of {@link #segmentIdentifier} is + * completely pushed and dropped. + * + * @param dataSegment pushed {@link DataSegment} + */ + public void pushAndDrop(DataSegment dataSegment) + { + checkStateTransition(this.state, SegmentState.APPENDING, SegmentState.PUSHED_AND_DROPPED); + this.state = SegmentState.PUSHED_AND_DROPPED; + this.dataSegment = dataSegment; + } + + @JsonProperty + public SegmentIdentifier getSegmentIdentifier() + { + return segmentIdentifier; + } + + @JsonProperty + public SegmentState getState() + { + return state; + } + + @JsonProperty + @Nullable + public DataSegment getDataSegment() + { + return dataSegment; + } + + private static void checkStateTransition(SegmentState actualFrom, SegmentState expectedFrom, SegmentState to) + { + Preconditions.checkState( + actualFrom == expectedFrom, + "Wrong state transition from [%s] to [%s]", + actualFrom, + to + ); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java index f5d7ae1e1902..72a7f8b325c4 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import io.druid.timeline.DataSegment; +import javax.annotation.Nullable; import java.util.List; import java.util.Objects; @@ -34,13 +35,14 @@ public class SegmentsAndMetadata public SegmentsAndMetadata( List segments, - Object commitMetadata + @Nullable Object commitMetadata ) { this.segments = ImmutableList.copyOf(segments); this.commitMetadata = commitMetadata; } + @Nullable public Object getCommitMetadata() { return commitMetadata; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java new file mode 100644 index 000000000000..a5b1a3fbdd65 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -0,0 +1,416 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import io.druid.data.input.Committer; +import io.druid.data.input.InputRow; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.query.SegmentDescriptor; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * This class is specialized for streaming ingestion. In streaming ingestion, the segment lifecycle is like: + * + *
    + * APPENDING -> APPEND_FINISHED -> PUBLISHED
    + * 
    + * + *
      + *
    • APPENDING: Segment is available for appending.
    • + *
    • APPEND_FINISHED: Segment cannot be updated (data cannot be added anymore) and is waiting for being published.
    • + *
    • PUBLISHED: Segment is pushed to deep storage, its metadata is published to metastore, and finally the segment is + * dropped from local storage
    • + *
    + */ +public class StreamAppenderatorDriver extends BaseAppenderatorDriver +{ + private static final Logger log = new Logger(StreamAppenderatorDriver.class); + + private final SegmentHandoffNotifier handoffNotifier; + private final FireDepartmentMetrics metrics; + private final ObjectMapper objectMapper; + + /** + * Create a driver. + * + * @param appenderator appenderator + * @param segmentAllocator segment allocator + * @param handoffNotifierFactory handoff notifier factory + * @param usedSegmentChecker used segment checker + * @param objectMapper object mapper, used for serde of commit metadata + * @param metrics Firedepartment metrics + */ + public StreamAppenderatorDriver( + Appenderator appenderator, + SegmentAllocator segmentAllocator, + SegmentHandoffNotifierFactory handoffNotifierFactory, + UsedSegmentChecker usedSegmentChecker, + ObjectMapper objectMapper, + FireDepartmentMetrics metrics + ) + { + super(appenderator, segmentAllocator, usedSegmentChecker); + + this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory") + .createSegmentHandoffNotifier(appenderator.getDataSource()); + this.metrics = Preconditions.checkNotNull(metrics, "metrics"); + this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); + } + + @Override + @Nullable + public Object startJob() + { + handoffNotifier.start(); + + final AppenderatorDriverMetadata metadata = objectMapper.convertValue( + appenderator.startJob(), + AppenderatorDriverMetadata.class + ); + + log.info("Restored metadata[%s].", metadata); + + if (metadata != null) { + synchronized (segments) { + final Map lastSegmentIds = metadata.getLastSegmentIds(); + Preconditions.checkState( + metadata.getSegments().keySet().equals(lastSegmentIds.keySet()), + "Sequences for segment states and last segment IDs are not same" + ); + + final Map builders = new TreeMap<>(); + + for (Entry> entry : metadata.getSegments().entrySet()) { + final String sequenceName = entry.getKey(); + final SegmentsForSequenceBuilder builder = new SegmentsForSequenceBuilder(lastSegmentIds.get(sequenceName)); + builders.put(sequenceName, builder); + entry.getValue().forEach(builder::add); + } + + builders.forEach((sequence, builder) -> segments.put(sequence, builder.build())); + } + + return metadata.getCallerMetadata(); + } else { + return null; + } + } + + /** + * Add a row. Must not be called concurrently from multiple threads. + * + * @param row the row to add + * @param sequenceName sequenceName for this row's segment + * @param committerSupplier supplier of a committer associated with all data that has been added, including this row + * if {@param allowIncrementalPersists} is set to false then this will not be used + * @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence. + * Should be set to false if replica tasks would index events in same order + * @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period + * threshold is hit + * + * @return {@link AppenderatorDriverAddResult} + * + * @throws IOException if there is an I/O error while allocating or writing to a segment + */ + public AppenderatorDriverAddResult add( + final InputRow row, + final String sequenceName, + final Supplier committerSupplier, + final boolean skipSegmentLineageCheck, + final boolean allowIncrementalPersists + ) throws IOException + { + return append(row, sequenceName, committerSupplier, skipSegmentLineageCheck, allowIncrementalPersists); + } + + /** + * Move a set of identifiers out from "active", making way for newer segments. + * This method is to support KafkaIndexTask's legacy mode and will be removed in the future. + * See KakfaIndexTask.runLegacy(). + */ + public void moveSegmentOut(final String sequenceName, final List identifiers) + { + synchronized (segments) { + final SegmentsForSequence activeSegmentsForSequence = segments.get(sequenceName); + if (activeSegmentsForSequence == null) { + throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName); + } + + for (final SegmentIdentifier identifier : identifiers) { + log.info("Moving segment[%s] out of active list.", identifier); + final long key = identifier.getInterval().getStartMillis(); + if (activeSegmentsForSequence.get(key) == null || activeSegmentsForSequence.get(key).stream().noneMatch( + segmentWithState -> { + if (segmentWithState.getSegmentIdentifier().equals(identifier)) { + segmentWithState.finishAppending(); + return true; + } else { + return false; + } + } + )) { + throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier); + } + } + } + } + + /** + * Persist all data indexed through this driver so far. Blocks until complete. + *

    + * Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}. + * + * @param committer committer representing all data that has been added so far + * + * @return commitMetadata persisted + */ + public Object persist(final Committer committer) throws InterruptedException + { + try { + log.info("Persisting data."); + final long start = System.currentTimeMillis(); + final Object commitMetadata = appenderator.persistAll(wrapCommitter(committer)).get(); + log.info("Persisted pending data in %,dms.", System.currentTimeMillis() - start); + return commitMetadata; + } + catch (InterruptedException e) { + throw e; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + /** + * Persist all data indexed through this driver so far. Returns a future of persisted commitMetadata. + *

    + * Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}. + * + * @param committer committer representing all data that has been added so far + * + * @return future containing commitMetadata persisted + */ + public ListenableFuture persistAsync(final Committer committer) + { + return appenderator.persistAll(wrapCommitter(committer)); + } + + /** + * Execute a task in background to publish all segments corresponding to the given sequence names. The task + * internally pushes the segments to the deep storage first, and then publishes the metadata to the metadata storage. + * + * @param publisher segment publisher + * @param committer committer + * @param sequenceNames a collection of sequence names to be published + * + * @return a {@link ListenableFuture} for the submitted task which removes published {@code sequenceNames} from + * {@code activeSegments} and {@code publishPendingSegments} + */ + public ListenableFuture publish( + final TransactionalSegmentPublisher publisher, + final Committer committer, + final Collection sequenceNames + ) + { + final List theSegments = getSegmentWithStates(sequenceNames) + .map(SegmentWithState::getSegmentIdentifier) + .collect(Collectors.toList()); + + final ListenableFuture publishFuture = Futures.transform( + pushInBackground(wrapCommitter(committer), theSegments), + (AsyncFunction) segmentsAndMetadata -> publishInBackground( + segmentsAndMetadata, + publisher + ) + ); + + return Futures.transform( + publishFuture, + (Function) segmentsAndMetadata -> { + synchronized (segments) { + sequenceNames.forEach(segments::remove); + } + return segmentsAndMetadata; + } + ); + } + + /** + * Register the segments in the given {@link SegmentsAndMetadata} to be handed off and execute a background task which + * waits until the hand off completes. + * + * @param segmentsAndMetadata the result segments and metadata of + * {@link #publish(TransactionalSegmentPublisher, Committer, Collection)} + * + * @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task + * which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata + * of the caller of {@link AppenderatorDriverMetadata} + */ + public ListenableFuture registerHandoff(SegmentsAndMetadata segmentsAndMetadata) + { + if (segmentsAndMetadata == null) { + return Futures.immediateFuture(null); + + } else { + final List waitingSegmentIdList = segmentsAndMetadata.getSegments().stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toList()); + final Object metadata = Preconditions.checkNotNull(segmentsAndMetadata.getCommitMetadata(), "commitMetadata"); + + if (waitingSegmentIdList.isEmpty()) { + return Futures.immediateFuture( + new SegmentsAndMetadata( + segmentsAndMetadata.getSegments(), + ((AppenderatorDriverMetadata) metadata).getCallerMetadata() + ) + ); + } + + log.info("Register handoff of segments: [%s]", waitingSegmentIdList); + + final SettableFuture resultFuture = SettableFuture.create(); + final AtomicInteger numRemainingHandoffSegments = new AtomicInteger(waitingSegmentIdList.size()); + + for (final SegmentIdentifier segmentIdentifier : waitingSegmentIdList) { + handoffNotifier.registerSegmentHandoffCallback( + new SegmentDescriptor( + segmentIdentifier.getInterval(), + segmentIdentifier.getVersion(), + segmentIdentifier.getShardSpec().getPartitionNum() + ), + MoreExecutors.sameThreadExecutor(), + () -> { + log.info("Segment[%s] successfully handed off, dropping.", segmentIdentifier); + metrics.incrementHandOffCount(); + + final ListenableFuture dropFuture = appenderator.drop(segmentIdentifier); + Futures.addCallback( + dropFuture, + new FutureCallback() + { + @Override + public void onSuccess(Object result) + { + if (numRemainingHandoffSegments.decrementAndGet() == 0) { + log.info("Successfully handed off [%d] segments.", segmentsAndMetadata.getSegments().size()); + resultFuture.set( + new SegmentsAndMetadata( + segmentsAndMetadata.getSegments(), + ((AppenderatorDriverMetadata) metadata).getCallerMetadata() + ) + ); + } + } + + @Override + public void onFailure(Throwable e) + { + log.warn(e, "Failed to drop segment[%s]?!", segmentIdentifier); + numRemainingHandoffSegments.decrementAndGet(); + resultFuture.setException(e); + } + } + ); + } + ); + } + + return resultFuture; + } + } + + public ListenableFuture publishAndRegisterHandoff( + final TransactionalSegmentPublisher publisher, + final Committer committer, + final Collection sequenceNames + ) + { + return Futures.transform( + publish(publisher, committer, sequenceNames), + this::registerHandoff + ); + } + + @Override + public void close() + { + super.close(); + handoffNotifier.close(); + } + + private static class SegmentsForSequenceBuilder + { + private final NavigableMap> intervalToSegmentStates; + private final String lastSegmentId; + + SegmentsForSequenceBuilder(String lastSegmentId) + { + this.intervalToSegmentStates = new TreeMap<>(); + this.lastSegmentId = lastSegmentId; + } + + void add(SegmentWithState segmentWithState) + { + final SegmentIdentifier identifier = segmentWithState.getSegmentIdentifier(); + final LinkedList segmentsInInterval = intervalToSegmentStates.computeIfAbsent( + identifier.getInterval().getStartMillis(), + k -> new LinkedList<>() + ); + // always keep APPENDING segments for an interval start millis in the front + if (segmentWithState.getState() == SegmentState.APPENDING) { + segmentsInInterval.addFirst(segmentWithState); + } else { + segmentsInInterval.addLast(segmentWithState); + } + } + + SegmentsForSequence build() + { + return new SegmentsForSequence(intervalToSegmentStates, lastSegmentId); + } + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index de74f58966ac..359708a78c0a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -21,6 +21,7 @@ import io.druid.timeline.DataSegment; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Set; @@ -35,6 +36,6 @@ public interface TransactionalSegmentPublisher */ boolean publishSegments( Set segments, - Object commitMetadata + @Nullable Object commitMetadata ) throws IOException; } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java new file mode 100644 index 000000000000..c69c8e443a7d --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; +import io.druid.timeline.partition.NumberedShardSpec; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class BatchAppenderatorDriverTest +{ + private static final String DATA_SOURCE = "foo"; + private static final String VERSION = "abc123"; + private static final int MAX_ROWS_IN_MEMORY = 100; + private static final long TIMEOUT = 1000; + + private static final List ROWS = Arrays.asList( + new MapBasedInputRow( + DateTimes.of("2000"), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "1") + ), + new MapBasedInputRow( + DateTimes.of("2000T01"), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", 2.0) + ), + new MapBasedInputRow( + DateTimes.of("2000T01"), + ImmutableList.of("dim2"), + ImmutableMap.of("dim2", "bar", "met1", 2.0) + ) + ); + + private SegmentAllocator allocator; + private AppenderatorTester appenderatorTester; + private BatchAppenderatorDriver driver; + + @Before + public void setup() + { + appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); + allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + driver = new BatchAppenderatorDriver( + appenderatorTester.getAppenderator(), + allocator, + new TestUsedSegmentChecker(appenderatorTester) + ); + } + + @After + public void tearDown() throws Exception + { + driver.clear(); + driver.close(); + } + + @Test + public void testSimple() throws Exception + { + Assert.assertNull(driver.startJob()); + + for (InputRow row : ROWS) { + Assert.assertTrue(driver.add(row, "dummy").isOk()); + } + + checkSegmentStates(2, SegmentState.APPENDING); + + driver.pushAllAndClear(TIMEOUT); + + checkSegmentStates(2, SegmentState.PUSHED_AND_DROPPED); + + final SegmentsAndMetadata published = driver.publishAll(makeOkPublisher()).get( + TIMEOUT, + TimeUnit.MILLISECONDS + ); + + Assert.assertEquals( + ImmutableSet.of( + new SegmentIdentifier(DATA_SOURCE, Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)), + new SegmentIdentifier(DATA_SOURCE, Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0)) + ), + published.getSegments() + .stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toSet()) + ); + + Assert.assertNull(published.getCommitMetadata()); + } + + @Test + public void testIncrementalPush() throws Exception + { + Assert.assertNull(driver.startJob()); + + int i = 0; + for (InputRow row : ROWS) { + Assert.assertTrue(driver.add(row, "dummy").isOk()); + + checkSegmentStates(1, SegmentState.APPENDING); + checkSegmentStates(i, SegmentState.PUSHED_AND_DROPPED); + + driver.pushAllAndClear(TIMEOUT); + checkSegmentStates(0, SegmentState.APPENDING); + checkSegmentStates(++i, SegmentState.PUSHED_AND_DROPPED); + } + + final SegmentsAndMetadata published = driver.publishAll(makeOkPublisher()).get( + TIMEOUT, + TimeUnit.MILLISECONDS + ); + + Assert.assertEquals( + ImmutableSet.of( + new SegmentIdentifier(DATA_SOURCE, Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)), + new SegmentIdentifier(DATA_SOURCE, Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0)), + new SegmentIdentifier(DATA_SOURCE, Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(1, 0)) + ), + published.getSegments() + .stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toSet()) + ); + + Assert.assertNull(published.getCommitMetadata()); + } + + @Test + public void testRestart() + { + Assert.assertNull(driver.startJob()); + driver.close(); + appenderatorTester.getAppenderator().close(); + + Assert.assertNull(driver.startJob()); + } + + private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState expectedState) + { + final SegmentsForSequence segmentsForSequence = driver.getSegments().get("dummy"); + Assert.assertNotNull(segmentsForSequence); + final List segmentWithStates = segmentsForSequence + .segmentStateStream() + .filter(segmentWithState -> segmentWithState.getState() == expectedState) + .collect(Collectors.toList()); + + Assert.assertEquals(expectedNumSegmentsInState, segmentWithStates.size()); + } + + static TransactionalSegmentPublisher makeOkPublisher() + { + return (segments, commitMetadata) -> true; + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/SegmentWithStateTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/SegmentWithStateTest.java new file mode 100644 index 000000000000..569eebdc8a11 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/SegmentWithStateTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class SegmentWithStateTest +{ + @Test + public void testSerde() throws IOException + { + final ObjectMapper objectMapper = new DefaultObjectMapper(); + final byte[] bytes = objectMapper.writeValueAsBytes(SegmentWithState.SegmentState.APPEND_FINISHED); + Assert.assertEquals( + SegmentWithState.SegmentState.APPEND_FINISHED, + objectMapper.readValue(bytes, SegmentWithState.SegmentState.class) + ); + } + + @Test + public void testSerdeForBackwardCompatibility() throws IOException + { + final ObjectMapper objectMapper = new DefaultObjectMapper(); + Assert.assertEquals( + SegmentWithState.SegmentState.APPENDING, + objectMapper.readValue("\"ACTIVE\"", SegmentWithState.SegmentState.class) + ); + Assert.assertEquals( + SegmentWithState.SegmentState.APPEND_FINISHED, + objectMapper.readValue("\"INACTIVE\"", SegmentWithState.SegmentState.class) + ); + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java similarity index 94% rename from server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java rename to server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 1cc19f4c1458..a249ccb7f088 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -40,9 +40,9 @@ import io.druid.query.SegmentDescriptor; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.realtime.FireDepartmentMetrics; -import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestCommitterSupplier; -import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentAllocator; -import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentHandoffNotifierFactory; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestCommitterSupplier; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; import org.hamcrest.CoreMatchers; import org.joda.time.Interval; @@ -65,7 +65,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -public class AppenderatorDriverFailTest +public class StreamAppenderatorDriverFailTest { private static final String DATA_SOURCE = "foo"; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); @@ -91,7 +91,7 @@ public class AppenderatorDriverFailTest SegmentAllocator allocator; TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; - AppenderatorDriver driver; + StreamAppenderatorDriver driver; @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -121,7 +121,7 @@ public void testFailDuringPersist() throws IOException, InterruptedException, Ti + "[[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123, " + "foo_2000-01-01T01:00:00.000Z_2000-01-01T02:00:00.000Z_abc123]]"); - driver = new AppenderatorDriver( + driver = new StreamAppenderatorDriver( createPersistFailAppenderator(), allocator, segmentHandoffNotifierFactory, @@ -139,11 +139,11 @@ public void testFailDuringPersist() throws IOException, InterruptedException, Ti for (int i = 0; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); } driver.publish( - AppenderatorDriverTest.makeOkPublisher(), + StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), ImmutableList.of("dummy") ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); @@ -158,7 +158,7 @@ public void testFailDuringPush() throws IOException, InterruptedException, Timeo + "[[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123, " + "foo_2000-01-01T01:00:00.000Z_2000-01-01T02:00:00.000Z_abc123]]"); - driver = new AppenderatorDriver( + driver = new StreamAppenderatorDriver( createPushFailAppenderator(), allocator, segmentHandoffNotifierFactory, @@ -176,11 +176,11 @@ public void testFailDuringPush() throws IOException, InterruptedException, Timeo for (int i = 0; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); } driver.publish( - AppenderatorDriverTest.makeOkPublisher(), + StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), ImmutableList.of("dummy") ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); @@ -195,7 +195,7 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo "Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]" ); - driver = new AppenderatorDriver( + driver = new StreamAppenderatorDriver( createDropFailAppenderator(), allocator, segmentHandoffNotifierFactory, @@ -213,11 +213,11 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo for (int i = 0; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); } final SegmentsAndMetadata published = driver.publish( - AppenderatorDriverTest.makeOkPublisher(), + StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), ImmutableList.of("dummy") ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java similarity index 90% rename from server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java rename to server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index d9681351c77e..ee92d2e2d7d0 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -27,8 +27,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -44,10 +42,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; -import io.druid.timeline.TimelineObjectHolder; -import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.NumberedShardSpec; -import io.druid.timeline.partition.PartitionChunk; import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; @@ -66,7 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -public class AppenderatorDriverTest +public class StreamAppenderatorDriverTest { private static final String DATA_SOURCE = "foo"; private static final String VERSION = "abc123"; @@ -94,10 +89,10 @@ public class AppenderatorDriverTest ) ); - SegmentAllocator allocator; - AppenderatorTester appenderatorTester; - TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; - AppenderatorDriver driver; + private SegmentAllocator allocator; + private AppenderatorTester appenderatorTester; + private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; + private StreamAppenderatorDriver driver; @Before public void setUp() @@ -105,11 +100,11 @@ public void setUp() appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); - driver = new AppenderatorDriver( + driver = new StreamAppenderatorDriver( appenderatorTester.getAppenderator(), allocator, segmentHandoffNotifierFactory, - new TestUsedSegmentChecker(), + new TestUsedSegmentChecker(appenderatorTester), OBJECT_MAPPER, new FireDepartmentMetrics() ); @@ -131,7 +126,7 @@ public void testSimple() throws Exception for (int i = 0; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); } final SegmentsAndMetadata published = driver.publish( @@ -177,7 +172,7 @@ public void testMaxRowsPerSegment() throws Exception 2.0 ) ); - final AppenderatorDriverAddResult addResult = driver.add(row, "dummy", committerSupplier); + final AppenderatorDriverAddResult addResult = driver.add(row, "dummy", committerSupplier, false, true); Assert.assertTrue(addResult.isOk()); if (addResult.getNumRowsInSegment() > MAX_ROWS_PER_SEGMENT) { driver.moveSegmentOut("dummy", ImmutableList.of(addResult.getSegmentIdentifier())); @@ -210,7 +205,7 @@ public void testHandoffTimeout() throws Exception for (int i = 0; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); } final SegmentsAndMetadata published = driver.publish( @@ -236,7 +231,7 @@ public void testPublishPerRow() throws IOException, InterruptedException, Timeou // Add the first row and publish immediately { committerSupplier.setMetadata(1); - Assert.assertTrue(driver.add(ROWS.get(0), "dummy", committerSupplier).isOk()); + Assert.assertTrue(driver.add(ROWS.get(0), "dummy", committerSupplier, false, true).isOk()); final SegmentsAndMetadata segmentsAndMetadata = driver.publishAndRegisterHandoff( makeOkPublisher(), @@ -257,7 +252,7 @@ public void testPublishPerRow() throws IOException, InterruptedException, Timeou // Add the second and third rows and publish immediately for (int i = 1; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); final SegmentsAndMetadata segmentsAndMetadata = driver.publishAndRegisterHandoff( makeOkPublisher(), @@ -302,11 +297,11 @@ public void testIncrementalHandoff() throws Exception Assert.assertNull(driver.startJob()); committerSupplier.setMetadata(1); - Assert.assertTrue(driver.add(ROWS.get(0), "sequence_0", committerSupplier).isOk()); + Assert.assertTrue(driver.add(ROWS.get(0), "sequence_0", committerSupplier, false, true).isOk()); for (int i = 1; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "sequence_1", committerSupplier).isOk()); + Assert.assertTrue(driver.add(ROWS.get(i), "sequence_1", committerSupplier, false, true).isOk()); } final ListenableFuture futureForSequence0 = driver.publishAndRegisterHandoff( @@ -501,33 +496,4 @@ public void close() }; } } - - private class TestUsedSegmentChecker implements UsedSegmentChecker - { - @Override - public Set findUsedSegments(Set identifiers) throws IOException - { - final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - for (DataSegment dataSegment : appenderatorTester.getPushedSegments()) { - timeline.add( - dataSegment.getInterval(), - dataSegment.getVersion(), - dataSegment.getShardSpec().createChunk(dataSegment) - ); - } - - final Set retVal = Sets.newHashSet(); - for (SegmentIdentifier identifier : identifiers) { - for (TimelineObjectHolder holder : timeline.lookup(identifier.getInterval())) { - for (PartitionChunk chunk : holder.getObject()) { - if (identifiers.contains(SegmentIdentifier.fromDataSegment(chunk.getObject()))) { - retVal.add(chunk.getObject()); - } - } - } - } - - return retVal; - } - } } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java b/server/src/test/java/io/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java new file mode 100644 index 000000000000..e2e7cd620801 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java @@ -0,0 +1,66 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.PartitionChunk; + +import java.io.IOException; +import java.util.Set; + +public class TestUsedSegmentChecker implements UsedSegmentChecker +{ + private final AppenderatorTester appenderatorTester; + + public TestUsedSegmentChecker(AppenderatorTester appenderatorTester) + { + this.appenderatorTester = appenderatorTester; + } + + @Override + public Set findUsedSegments(Set identifiers) throws IOException + { + final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + for (DataSegment dataSegment : appenderatorTester.getPushedSegments()) { + timeline.add( + dataSegment.getInterval(), + dataSegment.getVersion(), + dataSegment.getShardSpec().createChunk(dataSegment) + ); + } + + final Set retVal = Sets.newHashSet(); + for (SegmentIdentifier identifier : identifiers) { + for (TimelineObjectHolder holder : timeline.lookup(identifier.getInterval())) { + for (PartitionChunk chunk : holder.getObject()) { + if (identifiers.contains(SegmentIdentifier.fromDataSegment(chunk.getObject()))) { + retVal.add(chunk.getObject()); + } + } + } + } + + return retVal; + } +}