diff --git a/docs/content/design/index.md b/docs/content/design/index.md index fb48c6dcf6b8..f4399be1ec39 100644 --- a/docs/content/design/index.md +++ b/docs/content/design/index.md @@ -32,6 +32,30 @@ They each represent an axis of the data that we’ve chosen to slice across. Metrics are usually numeric values, and computations include operations such as count, sum, and mean. Also known as measures in standard OLAP terminology. +## Sharding the Data + +Druid shards are called `segments` and Druid always first shards data by time. In our compacted data set, we can create two segments, one for each hour of data. + +For example: + +Segment `sampleData_2011-01-01T01:00:00:00Z_2011-01-01T02:00:00:00Z_v1_0` contains + + 2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70 + 2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18 + + +Segment `sampleData_2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z_v1_0` contains + + 2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31 + 2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01 + +Segments are self-contained containers for the time interval of data they hold. Segments +contain data stored in compressed column orientations, along with the indexes for those columns. Druid queries only understand how to +scan segments. + +Segments are uniquely identified by a datasource, interval, version, and an optional partition number. +Examining our example segments, the segments are named following this convention: `dataSource_interval_version_partitionNumber` + ## Roll-up The individual events in our example data set are not very interesting because there may be trillions of such events. @@ -56,30 +80,15 @@ This storage reduction does come at a cost; as we roll up data, we lose the abil the rollup granularity is the minimum granularity you will be able to explore data at and events are floored to this granularity. Hence, Druid ingestion specs define this granularity as the `queryGranularity` of the data. The lowest supported `queryGranularity` is millisecond. -## Sharding the Data - -Druid shards are called `segments` and Druid always first shards data by time. In our compacted data set, we can create two segments, one for each hour of data. +### Roll-up modes -For example: - -Segment `sampleData_2011-01-01T01:00:00:00Z_2011-01-01T02:00:00:00Z_v1_0` contains - - 2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70 - 2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18 +Druid supports two roll-up modes, i.e., _perfect roll-up_ and _best-effort roll-up_. In the perfect roll-up mode, Druid guarantees that input data are perfectly aggregated at ingestion time. Meanwhile, in the best-effort roll-up, input data might not be perfectly aggregated and thus there can be multiple segments holding the rows which should belong to the same segment with the perfect roll-up since they have the same dimension value and their timestamps fall into the same interval. +The perfect roll-up mode encompasses an additional preprocessing step to determine intervals and shardSpecs before actual data ingestion if they are not specified in the ingestionSpec. This preprocessing step usually scans the entire input data which might increase the ingestion time. The [Hadoop indexing task](./ingestion/batch-ingestion.html) always runs with this perfect roll-up mode. -Segment `sampleData_2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z_v1_0` contains - - 2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31 - 2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01 - -Segments are self-contained containers for the time interval of data they hold. Segments -contain data stored in compressed column orientations, along with the indexes for those columns. Druid queries only understand how to -scan segments. - -Segments are uniquely identified by a datasource, interval, version, and an optional partition number. -Examining our example segments, the segments are named following this convention: `dataSource_interval_version_partitionNumber` +On the contrary, the best-effort roll-up mode doesn't require any preprocessing step, but the size of ingested data might be larger than that of the perfect roll-up. All types of [streaming indexing (i.e., realtime index task, kafka indexing service, ...)](./ingestion/stream-ingestion.html) run with this mode. +Finally, the [native index task](./ingestion/tasks.html) supports both modes and you can choose either one which fits to your application. ## Indexing the Data diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 0647ff13f58b..76f1268dd4f2 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -114,10 +114,12 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |type|The task type, this should always be "index".|none|yes| |targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no| +|maxTotalRows|Total number of rows in segments waiting for being published. Used in determining when intermediate publish should occur.|150000|no| |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no| +|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](./design/index.html). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. This flag cannot be used with either `appendToExisting` of IOConfig or `forceExtendableShardSpecs`. For more details, see the below __Segment publishing modes__ section.|false|no| |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |publishTimeout|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|0|no| @@ -148,6 +150,16 @@ For Roaring bitmaps: |type|String|Must be `roaring`.|yes| |compressRunOnSerialization|Boolean|Use a run-length encoding where it is estimated as more space efficient.|no (default == `true`)| +#### Segment publishing modes + +While ingesting data using the Index task, it creates segments from the input data and publishes them. For segment publishing, the Index task supports two segment publishing modes, i.e., _bulk publishing mode_ and _incremental publishing mode_ for [perfect rollup and best-effort rollup](./design/index.html), respectively. + +In the bulk publishing mode, every segment is published at the very end of the index task. Until then, created segments are stored in the memory and local storage of the node running the index task. As a result, this mode might cause a problem due to limited storage capacity, and is not recommended to use in production. + +On the contrary, in the incremental publishing mode, segments are incrementally published, that is they can be published in the middle of the index task. More precisely, the index task collects data and stores created segments in the memory and disks of the node running that task until the total number of collected rows exceeds `maxTotalRows`. Once it exceeds, the index task immediately publishes all segments created until that moment, cleans all published segments up, and continues to ingest remaining data. + +To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig. + Segment Merging Tasks --------------------- diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 432ec25cbed5..c9bcc5954c2e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -596,7 +596,7 @@ public String apply(DataSegment input) } toolbox.getDataSegmentServerAnnouncer().unannounce(); - + return success(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/appenderator/ActionBasedSegmentAllocator.java b/indexing-service/src/main/java/io/druid/indexing/appenderator/ActionBasedSegmentAllocator.java index de82223c5161..2ead3b2c4e45 100644 --- a/indexing-service/src/main/java/io/druid/indexing/appenderator/ActionBasedSegmentAllocator.java +++ b/indexing-service/src/main/java/io/druid/indexing/appenderator/ActionBasedSegmentAllocator.java @@ -19,12 +19,12 @@ package io.druid.indexing.appenderator; +import io.druid.data.input.InputRow; import io.druid.indexing.common.actions.SegmentAllocateAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.segment.indexing.DataSchema; import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentIdentifier; -import org.joda.time.DateTime; import java.io.IOException; @@ -44,7 +44,7 @@ public ActionBasedSegmentAllocator( @Override public SegmentIdentifier allocate( - final DateTime timestamp, + final InputRow row, final String sequenceName, final String previousSegmentId ) throws IOException @@ -52,7 +52,7 @@ public SegmentIdentifier allocate( return taskActionClient.submit( new SegmentAllocateAction( dataSchema.getDataSource(), - timestamp, + row.getTimestamp(), dataSchema.getGranularitySpec().getQueryGranularity(), dataSchema.getGranularitySpec().getSegmentGranularity(), sequenceName, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index cf705e64915a..a091d4b84b2b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -33,12 +33,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; +import com.google.common.util.concurrent.ListenableFuture; import io.druid.common.utils.JodaUtils; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; @@ -76,9 +74,9 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorConfig; +import io.druid.segment.realtime.appenderator.AppenderatorDriver; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; -import io.druid.segment.realtime.appenderator.AppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; @@ -90,7 +88,6 @@ import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.ShardSpec; -import io.druid.timeline.partition.ShardSpecLookup; import org.codehaus.plexus.util.FileUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -99,13 +96,18 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class IndexTask extends AbstractTask { @@ -185,12 +187,12 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception // Firehose temporary directory is automatically removed when this IndexTask completes. FileUtils.forceMkdir(firehoseTempDir); - final Map> shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir); + final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir); final String version; final DataSchema dataSchema; if (determineIntervals) { - Interval interval = JodaUtils.umbrellaInterval(shardSpecs.keySet()); + Interval interval = JodaUtils.umbrellaInterval(shardSpecs.getIntervals()); TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval)); version = lock.getVersion(); dataSchema = ingestionSchema.getDataSchema().withGranularitySpec( @@ -198,7 +200,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception .getGranularitySpec() .withIntervals( JodaUtils.condenseIntervals( - shardSpecs.keySet() + shardSpecs.getIntervals() ) ) ); @@ -214,61 +216,182 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception } } + private static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig) + { + Preconditions.checkState( + !(tuningConfig.isForceGuaranteedRollup() && + (tuningConfig.isForceExtendableShardSpecs() || ioConfig.isAppendToExisting())), + "Perfect rollup cannot be guaranteed with extendable shardSpecs" + ); + return tuningConfig.isForceGuaranteedRollup(); + } + /** - * Determines the number of shards for each interval using a hash of queryGranularity timestamp + all dimensions (i.e - * hash-based partitioning). In the future we may want to also support single-dimension partitioning. + * Determines intervals and shardSpecs for input data. This method first checks that it must determine intervals and + * shardSpecs by itself. Intervals must be determined if they are not specified in {@link GranularitySpec}. + * ShardSpecs must be determined if the perfect rollup must be guaranteed even though the number of shards is not + * specified in {@link IndexTuningConfig}. + *

+ * If both intervals and shardSpecs don't have to be determined, this method simply returns {@link ShardSpecs} for the + * given intervals. Here, if {@link IndexTuningConfig#numShards} is not specified, {@link NumberedShardSpec} is used. + *

+ * If one of intervals or shardSpecs need to be determined, this method reads the entire input for determining one of + * them. If the perfect rollup must be guaranteed, {@link HashBasedNumberedShardSpec} is used for hash partitioning + * of input data. In the future we may want to also support single-dimension partitioning. + * + * @return generated {@link ShardSpecs} representing a map of intervals and corresponding shard specs */ - private Map> determineShardSpecs( + private ShardSpecs determineShardSpecs( final TaskToolbox toolbox, final FirehoseFactory firehoseFactory, final File firehoseTempDir ) throws IOException { final ObjectMapper jsonMapper = toolbox.getObjectMapper(); + final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); + final IndexIOConfig ioConfig = ingestionSchema.getIOConfig(); + final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec(); - final Granularity queryGranularity = granularitySpec.getQueryGranularity(); - final boolean determineNumPartitions = ingestionSchema.getTuningConfig().getNumShards() == null; - final boolean determineIntervals = !ingestionSchema.getDataSchema() - .getGranularitySpec() - .bucketIntervals() - .isPresent(); - final Map> shardSpecs = Maps.newHashMap(); + final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent(); + // Guaranteed rollup means that this index task guarantees the 'perfect rollup' across the entire data set. + final boolean guaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig); + final boolean determineNumPartitions = tuningConfig.getNumShards() == null && guaranteedRollup; + final boolean useExtendableShardSpec = !guaranteedRollup; // if we were given number of shards per interval and the intervals, we don't need to scan the data if (!determineNumPartitions && !determineIntervals) { - log.info("numShards and intervals provided, skipping determine partition scan"); - final SortedSet intervals = ingestionSchema.getDataSchema() - .getGranularitySpec() - .bucketIntervals() - .get(); - final int numShards = ingestionSchema.getTuningConfig().getNumShards(); - - for (Interval interval : intervals) { - final List intervalShardSpecs = Lists.newArrayListWithCapacity(numShards); - if (numShards > 1) { - for (int i = 0; i < numShards; i++) { - intervalShardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper)); - } - } else { - intervalShardSpecs.add(NoneShardSpec.instance()); - } - shardSpecs.put(interval, intervalShardSpecs); + log.info("Skipping determine partition scan"); + return createShardSpecWithoutInputScan(jsonMapper, granularitySpec, tuningConfig, useExtendableShardSpec); + } else { + // determine intervals containing data and prime HLL collectors + return createShardSpecsFromInput( + jsonMapper, + ingestionSchema, + firehoseFactory, + firehoseTempDir, + granularitySpec, + tuningConfig, + determineIntervals, + determineNumPartitions, + useExtendableShardSpec + ); + } + } + + private static ShardSpecs createShardSpecWithoutInputScan( + ObjectMapper jsonMapper, + GranularitySpec granularitySpec, + IndexTuningConfig tuningConfig, + boolean useExtendableShardSpec + ) + { + final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards(); + final BiFunction shardSpecCreateFn = getShardSpecCreateFunction( + useExtendableShardSpec, + numShards, + jsonMapper + ); + + final Map> intervalToShardSpecs = new HashMap<>(); + for (Interval interval : granularitySpec.bucketIntervals().get()) { + final List intervalShardSpecs = IntStream.range(0, numShards) + .mapToObj( + shardId -> shardSpecCreateFn.apply(shardId, numShards) + ) + .collect(Collectors.toList()); + intervalToShardSpecs.put(interval, intervalShardSpecs); + } + + if (useExtendableShardSpec) { + return createExtendableShardSpecs(intervalToShardSpecs); + } else { + return createNonExtendableShardSpecs(intervalToShardSpecs); + } + } + + private static ShardSpecs createShardSpecsFromInput( + ObjectMapper jsonMapper, + IndexIngestionSpec ingestionSchema, + FirehoseFactory firehoseFactory, + File firehoseTempDir, + GranularitySpec granularitySpec, + IndexTuningConfig tuningConfig, + boolean determineIntervals, + boolean determineNumPartitions, + boolean useExtendableShardSpec + ) throws IOException + { + log.info("Determining intervals and shardSpecs"); + long determineShardSpecsStartMillis = System.currentTimeMillis(); + + final Map> hllCollectors = collectIntervalsAndShardSpecs( + jsonMapper, + ingestionSchema, + firehoseFactory, + firehoseTempDir, + granularitySpec, + determineIntervals, + determineNumPartitions + ); + + final Map> intervalToShardSpecs = new HashMap<>(); + final int defaultNumShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards(); + for (final Map.Entry> entry : hllCollectors.entrySet()) { + final Interval interval = entry.getKey(); + final Optional collector = entry.getValue(); + + final int numShards; + if (determineNumPartitions) { + final long numRows = new Double(collector.get().estimateCardinality()).longValue(); + numShards = (int) Math.ceil((double) numRows / tuningConfig.getTargetPartitionSize()); + log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards); + } else { + numShards = defaultNumShards; + log.info("Creating [%,d] shards for interval [%s]", numShards, interval); } - return shardSpecs; + final BiFunction shardSpecCreateFn = getShardSpecCreateFunction( + useExtendableShardSpec, + numShards, + jsonMapper + ); + + final List intervalShardSpecs = IntStream.range(0, numShards) + .mapToObj( + shardId -> shardSpecCreateFn.apply(shardId, numShards) + ).collect(Collectors.toList()); + + intervalToShardSpecs.put(interval, intervalShardSpecs); + } + log.info("Found intervals and shardSpecs in %,dms", System.currentTimeMillis() - determineShardSpecsStartMillis); + + if (useExtendableShardSpec) { + return createExtendableShardSpecs(intervalToShardSpecs); + } else { + return createNonExtendableShardSpecs(intervalToShardSpecs); } + } - // determine intervals containing data and prime HLL collectors - final Map> hllCollectors = Maps.newHashMap(); + private static Map> collectIntervalsAndShardSpecs( + ObjectMapper jsonMapper, + IndexIngestionSpec ingestionSchema, + FirehoseFactory firehoseFactory, + File firehoseTempDir, + GranularitySpec granularitySpec, + boolean determineIntervals, + boolean determineNumPartitions + ) throws IOException + { + final Map> hllCollectors = new TreeMap<>( + Comparators.intervalsByStartThenEnd() + ); int thrownAway = 0; int unparseable = 0; + final Granularity queryGranularity = granularitySpec.getQueryGranularity(); - log.info("Determining intervals and shardSpecs"); - long determineShardSpecsStartMillis = System.currentTimeMillis(); - try (final Firehose firehose = firehoseFactory.connect( - ingestionSchema.getDataSchema().getParser(), - firehoseTempDir) + try ( + final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser(), firehoseTempDir) ) { while (firehose.hasMore()) { try { @@ -291,24 +414,24 @@ private Map> determineShardSpecs( interval = optInterval.get(); } - if (!determineNumPartitions) { + if (determineNumPartitions) { + if (!hllCollectors.containsKey(interval)) { + hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); + } + + List groupKey = Rows.toGroupKey( + queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), + inputRow + ); + hllCollectors.get(interval).get() + .add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); + } else { // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent() // for the interval and don't instantiate a HLL collector if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.absent()); + hllCollectors.put(interval, Optional.absent()); } - continue; - } - - if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); } - - List groupKey = Rows.toGroupKey( - queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), - inputRow - ); - hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); } catch (ParseException e) { if (ingestionSchema.getTuningConfig().isReportParseExceptions()) { @@ -327,45 +450,121 @@ private Map> determineShardSpecs( if (unparseable > 0) { log.warn("Unable to parse [%,d] events", unparseable); } + return hllCollectors; + } - final ImmutableSortedMap> sortedMap = ImmutableSortedMap.copyOf( - hllCollectors, - Comparators.intervalsByStartThenEnd() - ); + private static ShardSpecs createNonExtendableShardSpecs(Map> intervalToShardSpecs) + { + return new ShardSpecs() + { + @Override + public Collection getIntervals() + { + return intervalToShardSpecs.keySet(); + } - for (final Map.Entry> entry : sortedMap.entrySet()) { - final Interval interval = entry.getKey(); - final Optional collector = entry.getValue(); + @Override + public ShardSpec getShardSpec(Interval interval, InputRow row) + { + final List shardSpecs = intervalToShardSpecs.get(interval); + if (shardSpecs == null || shardSpecs.isEmpty()) { + throw new ISE("Failed to get shardSpec for interval[%s]", interval); + } + return shardSpecs.get(0).getLookup(shardSpecs).getShardSpec(row.getTimestampFromEpoch(), row); + } - final int numShards; - if (determineNumPartitions) { - final long numRows = new Double(collector.get().estimateCardinality()).longValue(); - numShards = (int) Math.ceil((double) numRows / ingestionSchema.getTuningConfig().getTargetPartitionSize()); - log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards); - } else { - numShards = ingestionSchema.getTuningConfig().getNumShards(); - log.info("Creating [%,d] shards for interval [%s]", numShards, interval); + @Override + public void updateShardSpec(Interval interval) + { + // do nothing } + }; + } - final List intervalShardSpecs = Lists.newArrayListWithCapacity(numShards); - if (numShards > 1) { - for (int i = 0; i < numShards; i++) { - intervalShardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper)); - } + private static ShardSpecs createExtendableShardSpecs(Map> intervalToShardSpec) + { + final Map shardSpecMap = new HashMap<>(intervalToShardSpec.size()); + + intervalToShardSpec.forEach((interval, shardSpecs) -> { + Preconditions.checkState(shardSpecs.size() == 1); + shardSpecMap.put(interval, shardSpecs.get(0)); + }); + + return new ShardSpecs() + { + @Override + public Collection getIntervals() + { + return shardSpecMap.keySet(); + } + + @Override + public ShardSpec getShardSpec(Interval interval, InputRow row) + { + return shardSpecMap.get(interval); + } + + @Override + public void updateShardSpec(Interval interval) + { + final ShardSpec shardSpec = shardSpecMap.get(interval); + Preconditions.checkState( + shardSpec instanceof NumberedShardSpec, + "shardSpec[%s] must be NumberedShardSpec", + shardSpec.getClass().getCanonicalName() + ); + final NumberedShardSpec previous = (NumberedShardSpec) shardSpec; + Preconditions.checkNotNull(previous, "previous shardSpec for interval[%s] is null", interval); + shardSpecMap.put(interval, new NumberedShardSpec(previous.getPartitionNum() + 1, previous.getPartitions())); + } + }; + } + + private static BiFunction getShardSpecCreateFunction( + boolean useExtendableShardSpec, + Integer numShards, + ObjectMapper jsonMapper + ) + { + if (useExtendableShardSpec) { + // 0 partitions means there's no core partitions. See NumberedPartitionChunk.isStart() and + // NumberedPartitionChunk.isEnd(). + return (shardId, notUsed) -> new NumberedShardSpec(shardId, 0); + } else { + if (numShards == null) { + throw new ISE("numShards must not be null"); + } + + if (numShards == 1) { + return (shardId, totalNumShards) -> NoneShardSpec.instance(); } else { - intervalShardSpecs.add(NoneShardSpec.instance()); + return (shardId, totalNumShards) -> new HashBasedNumberedShardSpec(shardId, totalNumShards, null, jsonMapper); } - shardSpecs.put(interval, intervalShardSpecs); } - log.info("Found intervals and shardSpecs in %,dms", System.currentTimeMillis() - determineShardSpecsStartMillis); - - return shardSpecs; } + /** + * This method reads input data row by row and adds the read row to a proper segment using {@link AppenderatorDriver}. + * If there is no segment for the row, a new one is created. Segments can be published in the middle of reading inputs + * if one of below conditions are satisfied. + * + *
    + *
  • + * If the number of rows in a segment exceeds {@link IndexTuningConfig#targetPartitionSize} + *
  • + *
  • + * If the number of rows added to {@link AppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows} + *
  • + *
+ * + * At the end of this method, all the remaining segments are published. + * + * @return true if generated segments are successfully published, otherwise false + */ private boolean generateAndPublishSegments( final TaskToolbox toolbox, final DataSchema dataSchema, - final Map> shardSpecs, + final ShardSpecs shardSpecs, final String version, final FirehoseFactory firehoseFactory, final File firehoseTempDir @@ -377,7 +576,6 @@ private boolean generateAndPublishSegments( dataSchema, new RealtimeIOConfig(null, null, null), null ); final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); - final Map sequenceNameToShardSpecMap = Maps.newHashMap(); if (toolbox.getMonitorScheduler() != null) { toolbox.getMonitorScheduler().addMonitor( @@ -388,33 +586,43 @@ dataSchema, new RealtimeIOConfig(null, null, null), null ); } + final IndexIOConfig ioConfig = ingestionSchema.getIOConfig(); + final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig; + final long publishTimeout = tuningConfig.getPublishTimeout(); + final long maxRowsInAppenderator = tuningConfig.getMaxTotalRows(); + final int maxRowsInSegment = tuningConfig.getTargetPartitionSize() == null + ? Integer.MAX_VALUE + : tuningConfig.getTargetPartitionSize(); + final boolean isGuaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig); + final SegmentAllocator segmentAllocator; - if (ingestionSchema.getIOConfig().isAppendToExisting()) { + if (ioConfig.isAppendToExisting()) { segmentAllocator = new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema); } else { - segmentAllocator = new SegmentAllocator() - { - @Override - public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, String previousSegmentId) - throws IOException - { - Optional interval = granularitySpec.bucketInterval(timestamp); - if (!interval.isPresent()) { - throw new ISE("Could not find interval for timestamp [%s]", timestamp); - } - - ShardSpec shardSpec = sequenceNameToShardSpecMap.get(sequenceName); - if (shardSpec == null) { - throw new ISE("Could not find ShardSpec for sequenceName [%s]", sequenceName); - } + segmentAllocator = (row, sequenceName, previousSegmentId) -> { + final DateTime timestamp = row.getTimestamp(); + Optional maybeInterval = granularitySpec.bucketInterval(timestamp); + if (!maybeInterval.isPresent()) { + throw new ISE("Could not find interval for timestamp [%s]", timestamp); + } - return new SegmentIdentifier(getDataSource(), interval.get(), version, shardSpec); + final Interval interval = maybeInterval.get(); + final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, row); + if (shardSpec == null) { + throw new ISE("Could not find shardSpec for interval[%s]", interval); } + + return new SegmentIdentifier(getDataSource(), interval, version, shardSpec); }; } + final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { + final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments, null, null); + return toolbox.getTaskActionClient().submit(action).isSuccess(); + }; + try ( - final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema); + final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig); final AppenderatorDriver driver = newDriver( appenderator, toolbox, @@ -424,7 +632,6 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); - final Map shardSpecLookups = Maps.newHashMap(); if (driver.startJob() != null) { driver.clear(); @@ -446,41 +653,40 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin } final Interval interval = optInterval.get(); - if (!shardSpecLookups.containsKey(interval)) { - final List intervalShardSpecs = shardSpecs.get(interval); - if (intervalShardSpecs == null || intervalShardSpecs.isEmpty()) { - throw new ISE("Failed to get shardSpec for interval[%s]", interval); - } - shardSpecLookups.put(interval, intervalShardSpecs.get(0).getLookup(intervalShardSpecs)); - } - - final ShardSpec shardSpec = shardSpecLookups.get(interval) - .getShardSpec(inputRow.getTimestampFromEpoch(), inputRow); - - final String sequenceName = StringUtils.format("index_%s_%s_%d", interval, version, shardSpec.getPartitionNum()); - - if (!sequenceNameToShardSpecMap.containsKey(sequenceName)) { - final ShardSpec shardSpecForPublishing = ingestionSchema.getTuningConfig().isForceExtendableShardSpecs() - || ingestionSchema.getIOConfig().isAppendToExisting() - ? new NumberedShardSpec( - shardSpec.getPartitionNum(), - shardSpecs.get(interval).size() - ) - : shardSpec; - - sequenceNameToShardSpecMap.put(sequenceName, shardSpecForPublishing); - } - + final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, inputRow); + final String sequenceName = Appenderators.getSequenceName(interval, version, shardSpec); final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier); - if (!addResult.isOk()) { - throw new ISE("Could not allocate segment for row with timestamp[%s]", inputRow.getTimestamp()); + if (addResult.isOk()) { + // incremental segment publishment is allowed only when rollup don't have to be perfect. + if (!isGuaranteedRollup && + (addResult.getNumRowsInSegment() >= maxRowsInSegment || + addResult.getTotalNumRowsInAppenderator() >= maxRowsInAppenderator)) { + // There can be some segments waiting for being published even though any rows won't be added to them. + // If those segments are not published here, the available space in appenderator will be kept to be small + // which makes the size of segments smaller. + final SegmentsAndMetadata published = awaitPublish( + driver.publishAll( + publisher, + committerSupplier.get() + ), + publishTimeout + ); + published.getSegments().forEach(segment -> shardSpecs.updateShardSpec(segment.getInterval())); + // Even though IndexTask uses NoopHandoffNotifier which does nothing for segment handoff, + // the below code is needed to update the total number of rows added to the appenderator so far. + // See AppenderatorDriver.registerHandoff() and Appenderator.drop(). + // A hard-coded timeout is used here because the below get() is expected to return immediately. + driver.registerHandoff(published).get(30, TimeUnit.SECONDS); + } + } else { + throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp()); } fireDepartmentMetrics.incrementProcessed(); } catch (ParseException e) { - if (ingestionSchema.getTuningConfig().isReportParseExceptions()) { + if (tuningConfig.isReportParseExceptions()) { throw e; } else { fireDepartmentMetrics.incrementUnparseable(); @@ -492,31 +698,13 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin driver.persist(committerSupplier.get()); } - final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() - { - @Override - public boolean publishSegments(Set segments, Object commitMetadata) throws IOException - { - final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments, null, null); - return toolbox.getTaskActionClient().submit(action).isSuccess(); - } - }; - - final SegmentsAndMetadata published; - final long publishTimeout = ingestionSchema.getTuningConfig().getPublishTimeout(); - if (publishTimeout == 0) { - published = driver.publish( - publisher, - committerSupplier.get(), - sequenceNameToShardSpecMap.keySet() - ).get(); - } else { - published = driver.publish( - publisher, - committerSupplier.get(), - sequenceNameToShardSpecMap.keySet() - ).get(publishTimeout, TimeUnit.MILLISECONDS); - } + final SegmentsAndMetadata published = awaitPublish( + driver.publishAll( + publisher, + committerSupplier.get() + ), + publishTimeout + ); if (published == null) { log.error("Failed to publish segments, aborting!"); @@ -545,11 +733,29 @@ public String apply(DataSegment input) } } - private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox, DataSchema dataSchema) + private static SegmentsAndMetadata awaitPublish( + ListenableFuture publishFuture, + long publishTimeout + ) + throws ExecutionException, InterruptedException, TimeoutException + { + if (publishTimeout == 0) { + return publishFuture.get(); + } else { + return publishFuture.get(publishTimeout, TimeUnit.MILLISECONDS); + } + } + + private static Appenderator newAppenderator( + FireDepartmentMetrics metrics, + TaskToolbox toolbox, + DataSchema dataSchema, + IndexTuningConfig tuningConfig + ) { return Appenderators.createOffline( dataSchema, - ingestionSchema.getTuningConfig().withBasePersistDirectory(toolbox.getPersistDir()), + tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), metrics, toolbox.getSegmentPusher(), toolbox.getObjectMapper(), @@ -558,7 +764,7 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox ); } - private AppenderatorDriver newDriver( + private static AppenderatorDriver newDriver( final Appenderator appenderator, final TaskToolbox toolbox, final SegmentAllocator segmentAllocator, @@ -575,6 +781,38 @@ private AppenderatorDriver newDriver( ); } + /** + * This interface represents a map of (Interval, ShardSpec) and is used for easy shardSpec generation. The most + * important method is {@link #updateShardSpec(Interval)} which updates the map according to the type of shardSpec. + */ + private interface ShardSpecs + { + /** + * Return the key set of the underlying map. + * + * @return a set of intervals + */ + Collection getIntervals(); + + /** + * Return a shardSpec for the given interval and input row. + * + * @param interval interval for shardSpec + * @param row input row + * @return a shardSpec + */ + ShardSpec getShardSpec(Interval interval, InputRow row); + + /** + * Update the shardSpec of the given interval. When the type of shardSpecs is extendable, this method must update + * the shardSpec properly. For example, if the {@link NumberedShardSpec} is used, an implementation of this method + * may replace the shardSpec of the given interval with a new one having a greater partitionNum. + * + * @param interval interval for shardSpec to be updated + */ + void updateShardSpec(Interval interval); + } + public static class IndexIngestionSpec extends IngestionSpec { private final DataSchema dataSchema; @@ -592,10 +830,7 @@ public IndexIngestionSpec( this.dataSchema = dataSchema; this.ioConfig = ioConfig; - this.tuningConfig = tuningConfig == null - ? - new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null) - : tuningConfig; + this.tuningConfig = tuningConfig == null ? new IndexTuningConfig() : tuningConfig; } @Override @@ -654,11 +889,13 @@ public boolean isAppendToExisting() @JsonTypeName("index") public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75000; + private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000; + private static final int DEFAULT_MAX_TOTAL_ROWS = 150_000; private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; private static final boolean DEFAULT_BUILD_V9_DIRECTLY = true; private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false; + private static final boolean DEFAULT_GUARANTEE_ROLLUP = false; private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; private static final long DEFAULT_PUBLISH_TIMEOUT = 0; @@ -666,11 +903,13 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final Integer targetPartitionSize; private final int maxRowsInMemory; + private final int maxTotalRows; private final Integer numShards; private final IndexSpec indexSpec; private final File basePersistDirectory; private final int maxPendingPersists; private final boolean forceExtendableShardSpecs; + private final boolean forceGuaranteedRollup; private final boolean reportParseExceptions; private final long publishTimeout; @@ -678,6 +917,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi public IndexTuningConfig( @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxTotalRows") @Nullable Integer maxTotalRows, @JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @@ -685,6 +925,7 @@ public IndexTuningConfig( // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs, + @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup, @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @JsonProperty("publishTimeout") @Nullable Long publishTimeout ) @@ -692,23 +933,32 @@ public IndexTuningConfig( this( targetPartitionSize, maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility, + maxTotalRows, numShards, indexSpec, maxPendingPersists, forceExtendableShardSpecs, + forceGuaranteedRollup, reportParseExceptions, publishTimeout, null ); } + private IndexTuningConfig() + { + this(null, null, null, null, null, null, null, null, null, null, null); + } + private IndexTuningConfig( @Nullable Integer targetPartitionSize, @Nullable Integer maxRowsInMemory, + @Nullable Integer maxTotalRows, @Nullable Integer numShards, @Nullable IndexSpec indexSpec, @Nullable Integer maxPendingPersists, @Nullable Boolean forceExtendableShardSpecs, + @Nullable Boolean forceGuaranteedRollup, @Nullable Boolean reportParseExceptions, @Nullable Long publishTimeout, @Nullable File basePersistDirectory @@ -725,17 +975,26 @@ private IndexTuningConfig( ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize); this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; + this.maxTotalRows = maxTotalRows == null + ? DEFAULT_MAX_TOTAL_ROWS + : maxTotalRows; this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; this.forceExtendableShardSpecs = forceExtendableShardSpecs == null ? DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS : forceExtendableShardSpecs; + this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup; this.reportParseExceptions = reportParseExceptions == null ? DEFAULT_REPORT_PARSE_EXCEPTIONS : reportParseExceptions; this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; this.basePersistDirectory = basePersistDirectory; + + Preconditions.checkArgument( + !(this.forceExtendableShardSpecs && this.forceGuaranteedRollup), + "Perfect rollup cannot be guaranteed with extendable shardSpecs" + ); } public IndexTuningConfig withBasePersistDirectory(File dir) @@ -743,10 +1002,12 @@ public IndexTuningConfig withBasePersistDirectory(File dir) return new IndexTuningConfig( targetPartitionSize, maxRowsInMemory, + maxTotalRows, numShards, indexSpec, maxPendingPersists, forceExtendableShardSpecs, + forceGuaranteedRollup, reportParseExceptions, publishTimeout, dir @@ -766,6 +1027,12 @@ public int getMaxRowsInMemory() return maxRowsInMemory; } + @JsonProperty + public int getMaxTotalRows() + { + return maxTotalRows; + } + @JsonProperty public Integer getNumShards() { @@ -803,16 +1070,22 @@ public boolean isBuildV9Directly() } @JsonProperty - @Override - public boolean isReportParseExceptions() + public boolean isForceExtendableShardSpecs() { - return reportParseExceptions; + return forceExtendableShardSpecs; } @JsonProperty - public boolean isForceExtendableShardSpecs() + public boolean isForceGuaranteedRollup() { - return forceExtendableShardSpecs; + return forceGuaranteedRollup; + } + + @JsonProperty + @Override + public boolean isReportParseExceptions() + { + return reportParseExceptions; } @JsonProperty diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index c812d17fe67d..f248d5e93adf 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -39,6 +39,7 @@ import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; +import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularities; @@ -105,7 +106,7 @@ public class IndexTaskTest 0 ); - private final IndexSpec indexSpec; + private static final IndexSpec indexSpec = new IndexSpec(); private final ObjectMapper jsonMapper; private IndexMergerV9 indexMergerV9; private IndexIO indexIO; @@ -113,7 +114,6 @@ public class IndexTaskTest public IndexTaskTest() { - indexSpec = new IndexSpec(); TestUtils testUtils = new TestUtils(); jsonMapper = testUtils.getTestObjectMapper(); indexMergerV9 = testUtils.getTestIndexMergerV9(); @@ -136,7 +136,13 @@ public void testDeterminePartitions() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, null, 2, null, false, false), + createIngestionSpec( + tmpDir, + null, + null, + createTuningConfig(2, null, false, true), + false + ), null, jsonMapper ); @@ -174,7 +180,13 @@ public void testForceExtendableShardSpecs() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, null, 2, null, true, false), + createIngestionSpec( + tmpDir, + null, + null, + createTuningConfig(2, null, true, false), + false + ), null, jsonMapper ); @@ -187,13 +199,11 @@ public void testForceExtendableShardSpecs() throws Exception Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NumberedShardSpec.class)); Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); - Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getPartitions()); Assert.assertEquals("test", segments.get(1).getDataSource()); Assert.assertEquals(new Interval("2014/P1D"), segments.get(1).getInterval()); Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NumberedShardSpec.class)); Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); - Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getPartitions()); } @Test @@ -219,9 +229,7 @@ public void testWithArbitraryGranularity() throws Exception Granularities.MINUTE, Collections.singletonList(new Interval("2014/2015")) ), - 10, - null, - false, + createTuningConfig(10, null, false, true), false ), null, @@ -256,9 +264,7 @@ public void testIntervalBucketing() throws Exception Granularities.HOUR, Collections.singletonList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z")) ), - 50, - null, - false, + createTuningConfig(50, null, false, true), false ), null, @@ -285,7 +291,13 @@ public void testNumShardsProvided() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, null, null, 1, false, false), + createIngestionSpec( + tmpDir, + null, + null, + createTuningConfig(null, 1, false, true), + false + ), null, jsonMapper ); @@ -316,7 +328,13 @@ public void testAppendToExisting() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, null, 2, null, false, true), + createIngestionSpec( + tmpDir, + null, + null, + createTuningConfig(2, null, false, false), + true + ), null, jsonMapper ); @@ -360,9 +378,7 @@ public void testIntervalNotSpecified() throws Exception Granularities.MINUTE, null ), - 2, - null, - false, + createTuningConfig(2, null, false, true), false ), null, @@ -423,9 +439,7 @@ public void testCSVFileWithHeader() throws Exception 0 ), null, - 2, - null, - false, + createTuningConfig(2, null, false, true), false ), null, @@ -475,9 +489,7 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception 0 ), null, - 2, - null, - false, + createTuningConfig(2, null, false, true), false ), null, @@ -493,6 +505,157 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); } + @Test + public void testWithSmallMaxTotalRows() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T00:00:10Z,b,2\n"); + writer.write("2014-01-01T00:00:10Z,c,3\n"); + writer.write("2014-01-01T01:00:20Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,b,2\n"); + writer.write("2014-01-01T01:00:20Z,c,3\n"); + writer.write("2014-01-01T02:00:30Z,a,1\n"); + writer.write("2014-01-01T02:00:30Z,b,2\n"); + writer.write("2014-01-01T02:00:30Z,c,3\n"); + } + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + null, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + createTuningConfig(2, 2, 2, null, false, false, true), + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(6, segments.size()); + + for (int i = 0; i < 6; i++) { + final DataSegment segment = segments.get(i); + final Interval expectedInterval = new Interval(StringUtils.format("2014-01-01T0%d/PT1H", (i / 2))); + final int expectedPartitionNum = i % 2; + + Assert.assertEquals("test", segment.getDataSource()); + Assert.assertEquals(expectedInterval, segment.getInterval()); + Assert.assertTrue(segment.getShardSpec().getClass().equals(NumberedShardSpec.class)); + Assert.assertEquals(expectedPartitionNum, segment.getShardSpec().getPartitionNum()); + } + } + + @Test + public void testPerfectRollup() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + populateRollupTestData(tmpFile); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + null, + new UniformGranularitySpec( + Granularities.DAY, + Granularities.DAY, + true, + null + ), + createTuningConfig(3, 2, 2, null, false, true, true), + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(3, segments.size()); + + for (int i = 0; i < 3; i++) { + final DataSegment segment = segments.get(i); + final Interval expectedInterval = new Interval("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"); + + Assert.assertEquals("test", segment.getDataSource()); + Assert.assertEquals(expectedInterval, segment.getInterval()); + Assert.assertTrue(segment.getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class)); + Assert.assertEquals(i, segment.getShardSpec().getPartitionNum()); + } + } + + @Test + public void testBestEffortRollup() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + populateRollupTestData(tmpFile); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + null, + new UniformGranularitySpec( + Granularities.DAY, + Granularities.DAY, + true, + null + ), + createTuningConfig(3, 2, 2, null, false, false, true), + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(5, segments.size()); + + for (int i = 0; i < 5; i++) { + final DataSegment segment = segments.get(i); + final Interval expectedInterval = new Interval("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"); + + Assert.assertEquals("test", segment.getDataSource()); + Assert.assertEquals(expectedInterval, segment.getInterval()); + Assert.assertTrue(segment.getShardSpec().getClass().equals(NumberedShardSpec.class)); + Assert.assertEquals(i, segment.getShardSpec().getPartitionNum()); + } + } + + private static void populateRollupTestData(File tmpFile) throws IOException + { + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,a,1\n"); + writer.write("2014-01-01T00:00:10Z,b,2\n"); + writer.write("2014-01-01T00:00:10Z,c,3\n"); + writer.write("2014-01-01T01:00:20Z,b,2\n"); + writer.write("2014-01-01T02:00:30Z,a,1\n"); + writer.write("2014-01-01T02:00:30Z,b,2\n"); + writer.write("2014-01-01T01:00:20Z,c,3\n"); + writer.write("2014-01-01T02:00:30Z,c,3\n"); + } + } + @Test public void testIgnoreParseException() throws Exception { @@ -527,11 +690,8 @@ public void testIgnoreParseException() throws Exception 0 ), null, - 2, - null, - false, - false, - false // ignore parse exception + createTuningConfig(2, null, null, null, false, false, false), // ignore parse exception, + false ); IndexTask indexTask = new IndexTask( @@ -584,11 +744,8 @@ public void testReportParseException() throws Exception 0 ), null, - 2, - null, - false, - false, - true // report parse exception + createTuningConfig(2, null, null, null, false, false, true), // report parse exception + false ); IndexTask indexTask = new IndexTask( @@ -647,11 +804,8 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception 0 ), null, - 2, - null, - false, - false, - true // report parse exception + createTuningConfig(2, 1, null, null, false, true, true), // report parse exception + false ); IndexTask indexTask = new IndexTask( @@ -668,6 +822,10 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception Assert.assertEquals(2, segments.size()); Assert.assertNotEquals(segments.get(0), segments.get(1)); + for (DataSegment segment : segments) { + System.out.println(segment.getDimensions()); + } + for (int i = 0; i < 2; i++) { final DataSegment segment = segments.get(i); final Set dimensions = new HashSet<>(segment.getDimensions()); @@ -717,11 +875,8 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws Exception 0 ), null, - 2, - null, - false, - false, - true // report parse exception + createTuningConfig(2, null, null, null, false, false, true), // report parse exception + false ); IndexTask indexTask = new IndexTask( @@ -820,34 +975,9 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( File baseDir, ParseSpec parseSpec, GranularitySpec granularitySpec, - Integer targetPartitionSize, - Integer numShards, - boolean forceExtendableShardSpecs, + IndexTuningConfig tuningConfig, boolean appendToExisting ) - { - return createIngestionSpec( - baseDir, - parseSpec, - granularitySpec, - targetPartitionSize, - numShards, - forceExtendableShardSpecs, - appendToExisting, - true - ); - } - - private IndexTask.IndexIngestionSpec createIngestionSpec( - File baseDir, - ParseSpec parseSpec, - GranularitySpec granularitySpec, - Integer targetPartitionSize, - Integer numShards, - boolean forceExtendableShardSpecs, - boolean appendToExisting, - boolean reportParseException - ) { return new IndexTask.IndexIngestionSpec( new DataSchema( @@ -874,20 +1004,54 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( baseDir, "druid*", null - ), appendToExisting + ), + appendToExisting ), - new IndexTask.IndexTuningConfig( - targetPartitionSize, - 1, - null, - numShards, - indexSpec, - null, - true, - forceExtendableShardSpecs, - reportParseException, - null - ) + tuningConfig + ); + } + + private static IndexTuningConfig createTuningConfig( + Integer targetPartitionSize, + Integer numShards, + boolean forceExtendableShardSpecs, + boolean forceGuaranteedRollup + ) + { + return createTuningConfig( + targetPartitionSize, + 1, + null, + numShards, + forceExtendableShardSpecs, + forceGuaranteedRollup, + true + ); + } + + private static IndexTuningConfig createTuningConfig( + Integer targetPartitionSize, + Integer maxRowsInMemory, + Integer maxTotalRows, + Integer numShards, + boolean forceExtendableShardSpecs, + boolean forceGuaranteedRollup, + boolean reportParseException + ) + { + return new IndexTask.IndexTuningConfig( + targetPartitionSize, + maxRowsInMemory, + maxTotalRows, + null, + numShards, + indexSpec, + null, + true, + forceExtendableShardSpecs, + forceGuaranteedRollup, + reportParseException, + null ); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 5a3161dec802..7b6b473b7e6c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -185,7 +185,7 @@ public void testIndexTaskSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true, null) + new IndexTask.IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null) ), null, jsonMapper @@ -248,7 +248,7 @@ public void testIndexTaskwithResourceSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) ), null, jsonMapper diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 430ca49919ca..c86041f0fa33 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -655,7 +655,7 @@ public void testIndexTask() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) ), null, MAPPER @@ -713,7 +713,7 @@ public void testIndexTaskFailure() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) ), null, MAPPER @@ -1078,7 +1078,7 @@ public void testResumeTasks() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null) ), null, MAPPER diff --git a/integration-tests/src/main/java/io/druid/testing/utils/TestQueryHelper.java b/integration-tests/src/main/java/io/druid/testing/utils/TestQueryHelper.java index f8b854ebb550..d9a65b3a5136 100644 --- a/integration-tests/src/main/java/io/druid/testing/utils/TestQueryHelper.java +++ b/integration-tests/src/main/java/io/druid/testing/utils/TestQueryHelper.java @@ -134,7 +134,8 @@ public int countRows(String dataSource, String interval) ImmutableList.of( new LongSumAggregatorFactory("rows", "count") ) - ).granularity(Granularities.ALL) + ) + .granularity(Granularities.ALL) .intervals(interval) .build(); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index abfdc9f828f3..0f808d9f2a0a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -62,7 +62,8 @@ public interface Appenderator extends QuerySegmentWalker, Closeable * Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used * asynchronously. *

- * The add, clear, persist, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the + * metadata committed by Committer in sync. * * @param identifier the segment into which this row should be added * @param row the row to add @@ -92,12 +93,20 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe */ int getRowCount(final SegmentIdentifier identifier); + /** + * Returns the number of total rows in this appenderator. + * + * @return total number of rows + */ + int getTotalRowCount(); + /** * Drop all in-memory and on-disk data, and forget any previously-remembered commit metadata. This could be useful if, * for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been * cleared. This may take some time, since all pending persists must finish first. * - * The add, clear, persist, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the + * metadata committed by Committer in sync. */ void clear() throws InterruptedException; @@ -121,7 +130,8 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe * persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with * the data persisted to disk. *

- * The add, clear, persist, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the + * metadata committed by Committer in sync. * * @param identifiers segment identifiers to be persisted * @param committer a committer associated with all data that has been added to segments of the given identifiers so @@ -138,7 +148,8 @@ int add(SegmentIdentifier identifier, InputRow row, Supplier committe * be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to * disk. *

- * The add, clear, persist, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the + * metadata committed by Committer in sync. * * @param committer a committer associated with all data that has been added so far * @@ -155,7 +166,8 @@ default ListenableFuture persistAll(Committer committer) *

* After this method is called, you cannot add new data to any segments that were previously under construction. *

- * The add, clear, persist, persistAll, and push methods should all be called from the same thread. + * The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the + * metadata committed by Committer in sync. * * @param identifiers list of segments to push * @param committer a committer associated with all data that has been added so far diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java index 710f1f655df2..81d5642c3838 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java @@ -57,7 +57,6 @@ import java.util.Map; import java.util.NavigableMap; import java.util.Objects; -import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; @@ -142,9 +141,9 @@ public Object startJob() { handoffNotifier.start(); - final FiniteAppenderatorDriverMetadata metadata = objectMapper.convertValue( + final AppenderatorDriverMetadata metadata = objectMapper.convertValue( appenderator.startJob(), - FiniteAppenderatorDriverMetadata.class + AppenderatorDriverMetadata.class ); log.info("Restored metadata[%s].", metadata); @@ -215,12 +214,12 @@ public AppenderatorDriverAddResult add( Preconditions.checkNotNull(sequenceName, "sequenceName"); Preconditions.checkNotNull(committerSupplier, "committerSupplier"); - final SegmentIdentifier identifier = getSegment(row.getTimestamp(), sequenceName); + final SegmentIdentifier identifier = getSegment(row, sequenceName); if (identifier != null) { try { - final int numRows = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier)); - return AppenderatorDriverAddResult.ok(identifier, numRows); + final int numRowsInMemory = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier)); + return AppenderatorDriverAddResult.ok(identifier, numRowsInMemory, appenderator.getTotalRowCount()); } catch (SegmentNotWritableException e) { throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier); @@ -265,7 +264,7 @@ public Object persist(final Committer committer) throws InterruptedException * * @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task * which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata - * of the caller of {@link FiniteAppenderatorDriverMetadata} + * of the caller of {@link AppenderatorDriverMetadata} */ public ListenableFuture registerHandoff(SegmentsAndMetadata segmentsAndMetadata) { @@ -281,7 +280,7 @@ public ListenableFuture registerHandoff(SegmentsAndMetadata return Futures.immediateFuture( new SegmentsAndMetadata( segmentsAndMetadata.getSegments(), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) .getCallerMetadata() ) ); @@ -317,7 +316,7 @@ public void onSuccess(Object result) resultFuture.set( new SegmentsAndMetadata( segmentsAndMetadata.getSegments(), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) .getCallerMetadata() ) ); @@ -372,23 +371,24 @@ private SegmentIdentifier getActiveSegment(final DateTime timestamp, final Strin /** * Return a segment usable for "timestamp". May return null if no segment can be allocated. * - * @param timestamp data timestamp + * @param row input row * @param sequenceName sequenceName for potential segment allocation * * @return identifier, or null * * @throws IOException if an exception occurs while allocating a segment */ - private SegmentIdentifier getSegment(final DateTime timestamp, final String sequenceName) throws IOException + private SegmentIdentifier getSegment(final InputRow row, final String sequenceName) throws IOException { synchronized (activeSegments) { + final DateTime timestamp = row.getTimestamp(); final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName); if (existing != null) { return existing; } else { // Allocate new segment. final SegmentIdentifier newSegment = segmentAllocator.allocate( - timestamp, + row, sequenceName, lastSegmentIds.get(sequenceName) ); @@ -437,6 +437,27 @@ public void moveSegmentOut(final String sequenceName, final List publishAll( + final TransactionalSegmentPublisher publisher, + final Committer committer + ) + { + final List sequenceNames; + synchronized (activeSegments) { + sequenceNames = ImmutableList.copyOf(publishPendingSegments.keySet()); + } + return publish(publisher, committer, sequenceNames); + } + /** * Execute a task in background to publish all segments corresponding to the given sequence names. The task * internally pushes the segments to the deep storage first, and then publishes the metadata to the metadata storage. @@ -446,7 +467,7 @@ public void moveSegmentOut(final String sequenceName, final List publish( final TransactionalSegmentPublisher publisher, @@ -520,67 +541,59 @@ private ListenableFuture publish( final List segmentIdentifiers ) { - return publishExecutor.submit( - () -> { - long nTry = 0; - while (true) { - try { - log.info("Pushing segments: [%s]", Joiner.on(", ").join(segmentIdentifiers)); - final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(segmentIdentifiers, wrappedCommitter) - .get(); - - // Sanity check - final Set pushedSegments = segmentsAndMetadata.getSegments().stream() - .map(SegmentIdentifier::fromDataSegment) - .collect(Collectors.toSet()); - if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) { - throw new ISE( - "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", - pushedSegments, - segmentIdentifiers - ); - } + log.info("Pushing segments: [%s]", Joiner.on(", ").join(segmentIdentifiers)); - log.info( - "Publishing segments with commitMetadata[%s]: [%s]", - segmentsAndMetadata.getCommitMetadata(), - Joiner.on(", ").join(segmentsAndMetadata.getSegments()) + return Futures.transform( + appenderator.push(segmentIdentifiers, wrappedCommitter), + (Function) segmentsAndMetadata -> { + // Sanity check + final Set pushedSegments = segmentsAndMetadata.getSegments().stream() + .map(SegmentIdentifier::fromDataSegment) + .collect(Collectors.toSet()); + if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) { + throw new ISE( + "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", + pushedSegments, + segmentIdentifiers + ); + } + + if (segmentsAndMetadata.getSegments().isEmpty()) { + log.info("Nothing to publish, skipping publish step."); + } else { + log.info( + "Publishing segments with commitMetadata[%s]: [%s]", + segmentsAndMetadata.getCommitMetadata(), + Joiner.on(", ").join(segmentsAndMetadata.getSegments()) + ); + + try { + final boolean published = publisher.publishSegments( + ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), + ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() ); - if (segmentsAndMetadata.getSegments().isEmpty()) { - log.info("Nothing to publish, skipping publish step."); + if (published) { + log.info("Published segments."); } else { - final boolean published = publisher.publishSegments( - ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() - ); - - if (published) { - log.info("Published segments, awaiting handoff."); + log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); + if (usedSegmentChecker.findUsedSegments(pushedSegments) + .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { + log.info("Our segments really do exist, awaiting handoff."); } else { - log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); - if (usedSegmentChecker.findUsedSegments(pushedSegments) - .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { - log.info("Our segments really do exist, awaiting handoff."); - } else { - log.warn("Our segments don't exist, giving up."); - return null; - } + log.warn("Our segments don't exist, giving up."); + return null; } } - - return segmentsAndMetadata; - } - catch (InterruptedException e) { - throw e; } - catch (Exception e) { - final long sleepMillis = computeNextRetrySleep(++nTry); - log.warn(e, "Failed publish (try %d), retrying in %,dms.", nTry, sleepMillis); - Thread.sleep(sleepMillis); + catch (IOException e) { + throw Throwables.propagate(e); } } - } + + return segmentsAndMetadata; + }, + publishExecutor ); } @@ -607,9 +620,9 @@ private Supplier wrapCommitterSupplier(final Supplier comm private WrappedCommitter wrapCommitter(final Committer committer) { - final FiniteAppenderatorDriverMetadata wrappedMetadata; + final AppenderatorDriverMetadata wrappedMetadata; synchronized (activeSegments) { - wrappedMetadata = new FiniteAppenderatorDriverMetadata( + wrappedMetadata = new AppenderatorDriverMetadata( ImmutableMap.copyOf( Maps.transformValues( activeSegments, @@ -644,12 +657,4 @@ public void run() } }; } - - private static long computeNextRetrySleep(final long nTry) - { - final long baseSleepMillis = 1000; - final long maxSleepMillis = 60000; - final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * new Random().nextGaussian(), 0), 2); - return (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry)) * fuzzyMultiplier); - } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java index 3ce1f2816b9a..04a70173e99b 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java @@ -32,21 +32,31 @@ public class AppenderatorDriverAddResult { private final SegmentIdentifier segmentIdentifier; private final int numRowsInSegment; + private final long totalNumRowsInAppenderator; - public static AppenderatorDriverAddResult ok(SegmentIdentifier segmentIdentifier, int numRowsInSegment) + public static AppenderatorDriverAddResult ok( + SegmentIdentifier segmentIdentifier, + int numRowsInSegment, + long totalNumRowsInAppenderator + ) { - return new AppenderatorDriverAddResult(segmentIdentifier, numRowsInSegment); + return new AppenderatorDriverAddResult(segmentIdentifier, numRowsInSegment, totalNumRowsInAppenderator); } public static AppenderatorDriverAddResult fail() { - return new AppenderatorDriverAddResult(null, 0); + return new AppenderatorDriverAddResult(null, 0, 0); } - private AppenderatorDriverAddResult(@Nullable SegmentIdentifier segmentIdentifier, int numRowsInSegment) + private AppenderatorDriverAddResult( + @Nullable SegmentIdentifier segmentIdentifier, + int numRowsInSegment, + long totalNumRowsInAppenderator + ) { this.segmentIdentifier = segmentIdentifier; this.numRowsInSegment = numRowsInSegment; + this.totalNumRowsInAppenderator = totalNumRowsInAppenderator; } public boolean isOk() @@ -63,4 +73,9 @@ public int getNumRowsInSegment() { return numRowsInSegment; } + + public long getTotalNumRowsInAppenderator() + { + return totalNumRowsInAppenderator; + } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java similarity index 94% rename from server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java rename to server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java index f38229da45e3..edff72572d9e 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; -public class FiniteAppenderatorDriverMetadata +public class AppenderatorDriverMetadata { private final Map> activeSegments; private final Map> publishPendingSegments; @@ -33,7 +33,7 @@ public class FiniteAppenderatorDriverMetadata private final Object callerMetadata; @JsonCreator - public FiniteAppenderatorDriverMetadata( + public AppenderatorDriverMetadata( @JsonProperty("activeSegments") Map> activeSegments, @JsonProperty("publishPendingSegments") Map> publishPendingSegments, @JsonProperty("lastSegmentIds") Map lastSegmentIds, @@ -73,7 +73,7 @@ public Object getCallerMetadata() @Override public String toString() { - return "FiniteAppenderatorDriverMetadata{" + + return "AppenderatorDriverMetadata{" + "activeSegments=" + activeSegments + ", publishPendingSegments=" + publishPendingSegments + ", lastSegmentIds=" + lastSegmentIds + diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 7486fbddfc05..a2492e483f51 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -48,6 +48,7 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -114,8 +115,11 @@ public class AppenderatorImpl implements Appenderator private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline<>( String.CASE_INSENSITIVE_ORDER ); - private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger(); + private final QuerySegmentWalker texasRanger; + // This variable updated in add(), persist(), and drop() + private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger(); + private final AtomicInteger totalRows = new AtomicInteger(); private volatile ListeningExecutorService persistExecutor = null; private volatile ListeningExecutorService pushExecutor = null; @@ -214,7 +218,9 @@ public int add( throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier); } - rowsCurrentlyInMemory.addAndGet(sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd); + final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd; + rowsCurrentlyInMemory.addAndGet(numAddedRows); + totalRows.addAndGet(numAddedRows); if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush @@ -244,6 +250,12 @@ public int getRowCount(final SegmentIdentifier identifier) } } + @Override + public int getTotalRowCount() + { + return totalRows.get(); + } + @VisibleForTesting int getRowsInMemory() { @@ -352,10 +364,15 @@ public ListenableFuture persist(Collection identifier { final Map commitHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); + int numPersistedRows = 0; for (SegmentIdentifier identifier : identifiers) { final Sink sink = sinks.get(identifier); + if (sink == null) { + throw new ISE("No sink for identifier: %s", identifier); + } final List hydrants = Lists.newArrayList(sink); commitHydrants.put(identifier, hydrants.size()); + numPersistedRows += sink.getNumRowsInMemory(); final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); @@ -431,7 +448,7 @@ public String apply(Map.Entry entry) resetNextFlush(); // NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes. - rowsCurrentlyInMemory.set(0); + rowsCurrentlyInMemory.addAndGet(-numPersistedRows); return future; } @@ -453,7 +470,7 @@ public ListenableFuture push( for (final SegmentIdentifier identifier : identifiers) { final Sink sink = sinks.get(identifier); if (sink == null) { - throw new NullPointerException("No sink for identifier: " + identifier); + throw new ISE("No sink for identifier: %s", identifier); } theSinks.put(identifier, sink); sink.finishWriting(); @@ -576,9 +593,14 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink tuningConfig.getIndexSpec() ); - DataSegment segment = dataSegmentPusher.push( - mergedFile, - sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)) + // Retry pushing segments because uploading to deep storage might fail especially for cloud storage types + final DataSegment segment = RetryUtils.retry( + () -> dataSegmentPusher.push( + mergedFile, + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)) + ), + exception -> exception instanceof Exception, + 5 ); objectMapper.writeValue(descriptorFile, segment); @@ -867,6 +889,7 @@ private ListenableFuture abandonSegment( // Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks). rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()); + totalRows.addAndGet(-sink.getNumRows()); // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread. return Futures.transform( diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java index 282a967011c1..1014f1380a74 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java @@ -23,6 +23,7 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.java.util.common.StringUtils; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; @@ -31,6 +32,8 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.ShardSpec; +import org.joda.time.Interval; import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -122,4 +125,8 @@ public void unannounceSegments(Iterable segments) throws IOExceptio ); } + public static String getSequenceName(Interval interval, String version, ShardSpec shardSpec) + { + return StringUtils.format("index_%s_%s_%d", interval, version, shardSpec.getPartitionNum()); + } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentAllocator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentAllocator.java index 94b5ba245ff6..15b2934f72fa 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentAllocator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentAllocator.java @@ -19,7 +19,7 @@ package io.druid.segment.realtime.appenderator; -import org.joda.time.DateTime; +import io.druid.data.input.InputRow; import java.io.IOException; @@ -28,14 +28,14 @@ public interface SegmentAllocator /** * Allocates a new segment for a given timestamp. * - * @param timestamp timestamp of the event which triggered this allocation request + * @param row the event which triggered this allocation request * @param sequenceName sequenceName for this allocation * @param previousSegmentId segment identifier returned on the previous call to allocate for your sequenceName * * @return the pending segment identifier, or null if it was impossible to allocate a new segment */ SegmentIdentifier allocate( - DateTime timestamp, + InputRow row, String sequenceName, String previousSegmentId ) throws IOException; diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 07282e27686d..3341e23bc4e1 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -227,7 +227,7 @@ public String apply(@Nullable AggregatorFactory input) public int getNumRows() { synchronized (hydrantLock) { - return numRowsExcludingCurrIndex.get() + currHydrant.getIndex().size(); + return numRowsExcludingCurrIndex.get() + getNumRowsInMemory(); } } diff --git a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java index 7f019e351935..593dad190d00 100644 --- a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java @@ -29,6 +29,9 @@ import java.util.List; import java.util.Map; +/** + * An extendable linear shard spec. {@link #partitionNum} represents an unique id of a partition. + */ public class LinearShardSpec implements ShardSpec { private int partitionNum; diff --git a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java index e46e5e0bb875..0aca040b8ded 100644 --- a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java @@ -30,6 +30,13 @@ import java.util.List; import java.util.Map; +/** + * An extendable linear shard spec containing the information of core partitions. This class contains two variables of + * {@link #partitionNum} and {@link #partitions}, which represent the unique id of a partition and the number of core + * partitions, respectively. {@link #partitions} simply indicates that the atomic update is regarded as completed when + * {@link #partitions} partitions are successfully updated, and {@link #partitionNum} can go beyond it when some types + * of index tasks are trying to append to existing partitions. + */ public class NumberedShardSpec implements ShardSpec { @JsonIgnore diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java index 1203fcfa2f71..af2aedecd446 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java @@ -69,7 +69,7 @@ public class AppenderatorDriverFailTest { private static final String DATA_SOURCE = "foo"; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); - private static final long PUBLISH_TIMEOUT = 1000; + private static final long PUBLISH_TIMEOUT = 5000; private static final List ROWS = ImmutableList.of( new MapBasedInputRow( @@ -114,45 +114,15 @@ public void tearDown() throws Exception @Test public void testFailDuringPersist() throws IOException, InterruptedException, TimeoutException, ExecutionException - { - expectedException.expect(TimeoutException.class); - - driver = new AppenderatorDriver( - createPersistFailAppenderator(), - allocator, - segmentHandoffNotifierFactory, - new NoopUsedSegmentChecker(), - OBJECT_MAPPER, - new FireDepartmentMetrics() - ); - - driver.startJob(); - - final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); - segmentHandoffNotifierFactory.setHandoffDelay(100); - - Assert.assertNull(driver.startJob()); - - for (int i = 0; i < ROWS.size(); i++) { - committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk()); - } - - driver.publish( - AppenderatorDriverTest.makeOkPublisher(), - committerSupplier.get(), - ImmutableList.of("dummy") - ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); - } - - @Test - public void testInterruptDuringPush() throws IOException, InterruptedException, TimeoutException, ExecutionException { expectedException.expect(ExecutionException.class); - expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class)); + expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); + expectedException.expectMessage("Fail test while persisting segments" + + "[[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123, " + + "foo_2000-01-01T01:00:00.000Z_2000-01-01T02:00:00.000Z_abc123]]"); driver = new AppenderatorDriver( - createPushInterruptAppenderator(), + createPersistFailAppenderator(), allocator, segmentHandoffNotifierFactory, new NoopUsedSegmentChecker(), @@ -182,7 +152,11 @@ public void testInterruptDuringPush() throws IOException, InterruptedException, @Test public void testFailDuringPush() throws IOException, InterruptedException, TimeoutException, ExecutionException { - expectedException.expect(TimeoutException.class); + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); + expectedException.expectMessage("Fail test while pushing segments" + + "[[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123, " + + "foo_2000-01-01T01:00:00.000Z_2000-01-01T02:00:00.000Z_abc123]]"); driver = new AppenderatorDriver( createPushFailAppenderator(), @@ -281,7 +255,6 @@ static Appenderator createDropFailAppenderator() } private static class FailableAppenderator implements Appenderator - { private final Map> rows = new HashMap<>(); @@ -356,6 +329,12 @@ public int getRowCount(SegmentIdentifier identifier) } } + @Override + public int getTotalRowCount() + { + return numRows; + } + @Override public void clear() throws InterruptedException { diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java index b6d7b9d778b0..13b7e52a32a1 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java @@ -405,13 +405,13 @@ public TestSegmentAllocator(String dataSource, Granularity granularity) @Override public SegmentIdentifier allocate( - final DateTime timestamp, + final InputRow row, final String sequenceName, final String previousSegmentId ) throws IOException { synchronized (counters) { - final long timestampTruncated = granularity.bucketStart(timestamp).getMillis(); + final long timestampTruncated = granularity.bucketStart(row.getTimestamp()).getMillis(); if (!counters.containsKey(timestampTruncated)) { counters.put(timestampTruncated, new AtomicInteger()); } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 16484c48cdae..316e86b6a199 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -129,7 +129,7 @@ public void testSimpleIngestion() throws Exception { SegmentIdentifier si = plumber.getSegmentsView().values().toArray(new SegmentIdentifier[1])[0]; Assert.assertEquals(3, - appenderator.getRowCount(si)); + appenderator.getRowCount(si)); appenderator.clear(); Assert.assertTrue(appenderator.getSegments().isEmpty()); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java index 8fb6619dd831..650069f47032 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java @@ -67,7 +67,7 @@ public class AppenderatorTest @Test public void testSimpleIngestion() throws Exception { - try (final AppenderatorTester tester = new AppenderatorTester(2)) { + try (final AppenderatorTester tester = new AppenderatorTester(2, true)) { final Appenderator appenderator = tester.getAppenderator(); boolean thrown; @@ -138,7 +138,7 @@ public SegmentIdentifier apply(DataSegment input) @Test public void testMaxRowsInMemory() throws Exception { - try (final AppenderatorTester tester = new AppenderatorTester(3)) { + try (final AppenderatorTester tester = new AppenderatorTester(3, true)) { final Appenderator appenderator = tester.getAppenderator(); final AtomicInteger eventCount = new AtomicInteger(0); final Supplier committerSupplier = new Supplier() @@ -180,6 +180,8 @@ public void run() Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier); Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get()); + Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); appenderator.close(); Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); } @@ -189,7 +191,7 @@ public void run() public void testRestoreFromDisk() throws Exception { final RealtimeTuningConfig tuningConfig; - try (final AppenderatorTester tester = new AppenderatorTester(2)) { + try (final AppenderatorTester tester = new AppenderatorTester(2, true)) { final Appenderator appenderator = tester.getAppenderator(); tuningConfig = tester.getTuningConfig(); @@ -231,7 +233,7 @@ public void run() appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 5), committerSupplier); appenderator.close(); - try (final AppenderatorTester tester2 = new AppenderatorTester(2, tuningConfig.getBasePersistDirectory())) { + try (final AppenderatorTester tester2 = new AppenderatorTester(2, tuningConfig.getBasePersistDirectory(), true)) { final Appenderator appenderator2 = tester2.getAppenderator(); Assert.assertEquals(ImmutableMap.of("eventCount", 4), appenderator2.startJob()); Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), appenderator2.getSegments()); @@ -240,10 +242,52 @@ public void run() } } + @Test(timeout = 10000L) + public void testTotalRowCount() throws Exception + { + try (final AppenderatorTester tester = new AppenderatorTester(3, true)) { + final Appenderator appenderator = tester.getAppenderator(); + final ConcurrentMap commitMetadata = new ConcurrentHashMap<>(); + final Supplier committerSupplier = committerSupplierFromConcurrentMap(commitMetadata); + + Assert.assertEquals(0, appenderator.getTotalRowCount()); + appenderator.startJob(); + Assert.assertEquals(0, appenderator.getTotalRowCount()); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier); + Assert.assertEquals(1, appenderator.getTotalRowCount()); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier); + Assert.assertEquals(2, appenderator.getTotalRowCount()); + + appenderator.persistAll(committerSupplier.get()).get(); + Assert.assertEquals(2, appenderator.getTotalRowCount()); + appenderator.drop(IDENTIFIERS.get(0)).get(); + Assert.assertEquals(1, appenderator.getTotalRowCount()); + appenderator.drop(IDENTIFIERS.get(1)).get(); + Assert.assertEquals(0, appenderator.getTotalRowCount()); + + appenderator.add(IDENTIFIERS.get(2), IR("2001", "bar", 1), committerSupplier); + Assert.assertEquals(1, appenderator.getTotalRowCount()); + appenderator.add(IDENTIFIERS.get(2), IR("2001", "baz", 1), committerSupplier); + Assert.assertEquals(2, appenderator.getTotalRowCount()); + appenderator.add(IDENTIFIERS.get(2), IR("2001", "qux", 1), committerSupplier); + Assert.assertEquals(3, appenderator.getTotalRowCount()); + appenderator.add(IDENTIFIERS.get(2), IR("2001", "bob", 1), committerSupplier); + Assert.assertEquals(4, appenderator.getTotalRowCount()); + + appenderator.persistAll(committerSupplier.get()).get(); + Assert.assertEquals(4, appenderator.getTotalRowCount()); + appenderator.drop(IDENTIFIERS.get(2)).get(); + Assert.assertEquals(0, appenderator.getTotalRowCount()); + + appenderator.close(); + Assert.assertEquals(0, appenderator.getTotalRowCount()); + } + } + @Test public void testQueryByIntervals() throws Exception { - try (final AppenderatorTester tester = new AppenderatorTester(2)) { + try (final AppenderatorTester tester = new AppenderatorTester(2, true)) { final Appenderator appenderator = tester.getAppenderator(); appenderator.startJob(); @@ -379,7 +423,7 @@ public void testQueryByIntervals() throws Exception @Test public void testQueryBySegments() throws Exception { - try (final AppenderatorTester tester = new AppenderatorTester(2)) { + try (final AppenderatorTester tester = new AppenderatorTester(2, true)) { final Appenderator appenderator = tester.getAppenderator(); appenderator.startJob(); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 71fd3714d23c..855ceb355e8e 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -89,12 +89,21 @@ public AppenderatorTester( final int maxRowsInMemory ) { - this(maxRowsInMemory, null); + this(maxRowsInMemory, null, false); } public AppenderatorTester( final int maxRowsInMemory, - final File basePersistDirectory + final boolean enablePushFailure + ) + { + this(maxRowsInMemory, null, enablePushFailure); + } + + public AppenderatorTester( + final int maxRowsInMemory, + final File basePersistDirectory, + final boolean enablePushFailure ) { objectMapper = new DefaultObjectMapper(); @@ -169,6 +178,8 @@ public int columnCacheSizeBytes() EmittingLogger.registerEmitter(emitter); dataSegmentPusher = new DataSegmentPusher() { + private boolean mustFail = true; + @Deprecated @Override public String getPathForHadoop(String dataSource) @@ -185,6 +196,12 @@ public String getPathForHadoop() @Override public DataSegment push(File file, DataSegment segment) throws IOException { + if (enablePushFailure && mustFail) { + mustFail = false; + throw new IOException("Push failure test"); + } else if (enablePushFailure) { + mustFail = true; + } pushedSegments.add(segment); return segment; }