Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline;

import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

@State(Scope.Benchmark)
@Fork(value = 1, jvmArgsAppend = {"-XX:+UseG1GC"})
@Warmup(iterations = 10)
@Measurement(iterations = 10)
@BenchmarkMode({Mode.Throughput})
public class VersionedIntervalTimelineBenchmark
{
private static final String DATA_SOURCE = "dataSource";
private static final Interval TOTAL_INTERVAL = Intervals.of("2018/2019");
private static final double NEW_ROOT_GEN_SEGMENTS_RATIO_AFTER_COMPACTION = 0.1;
private static final double COMPACTED_SEGMENTS_RATIO_TO_INITIAL_SEGMENTS = 0.5;

@Param({"10", "100", "1000"})
private int numInitialRootGenSegmentsPerInterval;

@Param({"1", "5"})
private int numNonRootGenerations;

@Param({"false", "true"})
private boolean useSegmentLock;

@Param({"MONTH", "DAY"})
private GranularityType segmentGranularity;

private List<Interval> intervals;
private List<DataSegment> segments;
private VersionedIntervalTimeline<String, DataSegment> timeline;
private List<DataSegment> newSegments;

@Setup
public void setup()
{
final int numNewRootGenSegmentsAfterCompaction =
(int) (numInitialRootGenSegmentsPerInterval * NEW_ROOT_GEN_SEGMENTS_RATIO_AFTER_COMPACTION);
final int numCompactedSegments =
(int) (numInitialRootGenSegmentsPerInterval * COMPACTED_SEGMENTS_RATIO_TO_INITIAL_SEGMENTS);

intervals = Lists.newArrayList(segmentGranularity.getDefaultGranularity().getIterable(TOTAL_INTERVAL));
segments = new ArrayList<>(intervals.size() * numInitialRootGenSegmentsPerInterval);
Map<Interval, Integer> nextRootGenPartitionIds = new HashMap<>(intervals.size());
Map<Interval, Integer> nextNonRootGenPartitionIds = new HashMap<>(intervals.size());
Map<Interval, Short> nextMinorVersions = new HashMap<>(intervals.size());

DateTime majorVersion = DateTimes.nowUtc();

for (Interval interval : intervals) {
majorVersion = majorVersion.plus(1);
int nextRootGenPartitionId = 0;
int nextNonRootGenPartitionId = PartitionIds.NON_ROOT_GEN_START_PARTITION_ID;

// Generate root generation segments
for (int i = 0; i < numInitialRootGenSegmentsPerInterval; i++) {
segments.add(newSegment(interval, majorVersion.toString(), new NumberedShardSpec(nextRootGenPartitionId++, 0)));
}

for (int i = 0; i < numNonRootGenerations; i++) {
if (!useSegmentLock) {
majorVersion = majorVersion.plus(1);
nextRootGenPartitionId = 0;
}
// Compacted segments
for (int j = 0; j < numCompactedSegments; j++) {
if (useSegmentLock) {
segments.add(
newSegment(
interval,
majorVersion.toString(),
new NumberedOverwriteShardSpec(
nextNonRootGenPartitionId++,
0,
nextRootGenPartitionId,
(short) (i + 1),
(short) numCompactedSegments
)
)
);
} else {
segments.add(newSegment(interval, majorVersion.toString(), new NumberedShardSpec(nextRootGenPartitionId++, 0)));
}
}

// New segments
for (int j = 0; j < numNewRootGenSegmentsAfterCompaction; j++) {
segments.add(newSegment(interval, majorVersion.toString(), new NumberedShardSpec(nextRootGenPartitionId++, 0)));
}
}
nextRootGenPartitionIds.put(interval, nextRootGenPartitionId);
nextNonRootGenPartitionIds.put(interval, nextNonRootGenPartitionId);
nextMinorVersions.put(interval, (short) (numNonRootGenerations + 1));
}

timeline = VersionedIntervalTimeline.forSegments(segments);

newSegments = new ArrayList<>(200);

// Generate new appending segments
for (int i = 0; i < 100; i++) {
final Interval interval = intervals.get(ThreadLocalRandom.current().nextInt(intervals.size()));
final int rootPartitionId = nextRootGenPartitionIds.get(interval);

newSegments.add(
newSegment(
interval,
majorVersion.toString(),
new NumberedShardSpec(rootPartitionId, 0)
)
);
nextRootGenPartitionIds.put(interval, rootPartitionId + 1);
}

// Generate overwriting segments
if (!useSegmentLock) {
majorVersion = majorVersion.plus(1);
nextRootGenPartitionIds.keySet().forEach(interval -> nextRootGenPartitionIds.put(interval, 0));
}

final List<Interval> intervalCopies = new ArrayList<>(intervals);
for (int i = 0; i < 100 && !intervalCopies.isEmpty(); i++) {
final Interval interval = intervalCopies.remove(ThreadLocalRandom.current().nextInt(intervalCopies.size()));
int rootPartitionId = nextRootGenPartitionIds.remove(interval);
int nonRootPartitionId = nextNonRootGenPartitionIds.remove(interval);
final short minorVersion = nextMinorVersions.remove(interval);

for (int j = 0; j < numCompactedSegments; j++) {
if (useSegmentLock) {
newSegments.add(
newSegment(
interval,
majorVersion.toString(),
new NumberedOverwriteShardSpec(
nonRootPartitionId++,
0,
rootPartitionId,
minorVersion,
(short) numCompactedSegments
)
)
);
} else {
newSegments.add(
newSegment(
interval,
majorVersion.toString(),
new NumberedShardSpec(rootPartitionId++, 0)
)
);
}
}
}
}

@Benchmark
public void benchAdd(Blackhole blackhole)
{
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(segments);
for (DataSegment newSegment : newSegments) {
timeline.add(
newSegment.getInterval(),
newSegment.getVersion(),
newSegment.getShardSpec().createChunk(newSegment)
);
}
}

@Benchmark
public void benchRemove(Blackhole blackhole)
{
final List<DataSegment> segmentsCopy = new ArrayList<>(segments);
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(segmentsCopy);
final int numTests = (int) (segmentsCopy.size() * 0.1);
for (int i = 0; i < numTests; i++) {
final DataSegment segment = segmentsCopy.remove(ThreadLocalRandom.current().nextInt(segmentsCopy.size()));
blackhole.consume(
timeline.remove(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
);
}
}

@Benchmark
public void benchLookup(Blackhole blackhole)
{
final int intervalIndex = ThreadLocalRandom.current().nextInt(intervals.size() - 2);
final Interval queryInterval = new Interval(
intervals.get(intervalIndex).getStart(),
intervals.get(intervalIndex + 2).getEnd()
);
blackhole.consume(timeline.lookup(queryInterval));
}

@Benchmark
public void benchIsOvershadowed(Blackhole blackhole)
{
final DataSegment segment = segments.get(ThreadLocalRandom.current().nextInt(segments.size()));
blackhole.consume(timeline.isOvershadowed(segment.getInterval(), segment.getVersion(), segment));
}

@Benchmark
public void benchFindFullyOvershadowed(Blackhole blackhole)
{
blackhole.consume(timeline.findFullyOvershadowed());
}

private static DataSegment newSegment(Interval interval, String version, ShardSpec shardSpec)
{
return new DataSegment(
DATA_SOURCE,
interval,
version,
null,
null,
null,
shardSpec,
9,
10
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testKill() throws Exception
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);

Assert.assertEquals(segments, announced);

Assert.assertTrue(
getMetadataSegmentManager().markSegmentAsUnused(
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId().toString()
Expand Down