diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java index 9271de28425b..674844724a48 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java @@ -93,9 +93,10 @@ public class SegmentsCostCache * The value of 1 day means that cost function of co-locating two segments which have 1 days between their intervals * is 0.5 of the cost, if the intervals are adjacent. If the distance is 2 days, then 0.25, etc. */ - private static final double HALF_LIFE_DAYS = 1.0; - private static final double LAMBDA = Math.log(2) / HALF_LIFE_DAYS; - private static final double MILLIS_FACTOR = TimeUnit.DAYS.toMillis(1) / LAMBDA; + private static final double HALF_LIFE_HOURS = 24.0; + private static final double LAMBDA = Math.log(2) / HALF_LIFE_HOURS; + static final double NORMALIZATION_FACTOR = 1 / (LAMBDA * LAMBDA); + private static final double MILLIS_FACTOR = TimeUnit.HOURS.toMillis(1) / LAMBDA; /** * LIFE_THRESHOLD is used to avoid calculations for segments that are "far" @@ -156,7 +157,7 @@ public double cost(DataSegment segment) cost += bucket.cost(segment); } - return cost; + return cost * NORMALIZATION_FACTOR; } public static Builder builder() @@ -262,17 +263,12 @@ private double addLeftCost(DataSegment dataSegment, double t0, double t1, int in double leftCost = 0.0; // add to cost all left-overlapping segments int leftIndex = index - 1; - while (leftIndex >= 0 - && sortedSegments.get(leftIndex).getInterval().overlaps(dataSegment.getInterval())) { + while (leftIndex >= 0) { double start = convertStart(sortedSegments.get(leftIndex), interval); double end = convertEnd(sortedSegments.get(leftIndex), interval); leftCost += CostBalancerStrategy.intervalCost(end - start, t0 - start, t1 - start); --leftIndex; } - // add left-non-overlapping segments - if (leftIndex >= 0) { - leftCost += leftSum[leftIndex] * (FastMath.exp(-t1) - FastMath.exp(-t0)); - } return leftCost; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheV3.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheV3.java new file mode 100644 index 000000000000..4c50de2a65c9 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheV3.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.cost; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Ordering; +import org.apache.commons.math3.util.FastMath; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.DurationGranularity; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.server.coordinator.CostBalancerStrategy; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * SegmentsCostCache provides faster way to calculate cost function proposed in {@link CostBalancerStrategy}. + * See https://github.com/apache/druid/pull/2972 for more details about the cost function. + * + * Joint cost for two segments (you can make formulas below readable by copy-pasting to + * https://www.codecogs.com/latex/eqneditor.php): + * + * cost(Y, Y) = \int_{x_0}^{x_1} \int_{y_0}^{y_1} e^{-\lambda |x-y|}dxdy + * or + * cost(Y, Y) = e^{y_0 + y_1} (e^{x_0} - e^{x_1})(e^{y_0} - e^{y_1}) (*) + * if x_0 <= x_1 <= y_0 <= y_1 + * (*) lambda coefficient is omitted for simplicity. + * + * For a group of segments {S_xi}, i = {0, n} total joint cost with segment S_y could be calculated as: + * + * cost(Y, Y) = \sum cost(X_i, Y) = e^{y_0 + y_1} (e^{y_0} - e^{y_1}) \sum (e^{xi_0} - e^{xi_1}) + * if xi_0 <= xi_1 <= y_0 <= y_1 + * and + * cost(Y, Y) = \sum cost(X_i, Y) = (e^{y_0} - e^{y_1}) \sum e^{xi_0 + xi_1} (e^{xi_0} - e^{xi_1}) + * if y_0 <= y_1 <= xi_0 <= xi_1 + * + * SegmentsCostCache stores pre-computed sums for a group of segments {S_xi}: + * + * 1) \sum (e^{xi_0} - e^{xi_1}) -> leftSum + * 2) \sum e^{xi_0 + xi_1} (e^{xi_0} - e^{xi_1}) -> rightSum + * + * so that calculation of joint cost function for segment S_y became a O(1 + m) complexity task, where m + * is the number of segments in {S_xi} that overlaps S_y. + * + * Segments are stored in buckets. Bucket is a subset of segments contained in SegmentsCostCache, so that + * startTime of all segments inside a bucket are in the same time interval (with some granularity): + * + * |------------------------|--------------------------|-----------------------|-------- .... + * t_0 t_0+D t_0 + 2D t0 + 3D .... + * S_x1 S_x2 S_x3 S_x4 S_x5 S_x6 S_x7 S_x8 S_x9 + * bucket1 bucket2 bucket3 + * + * + * Reasons to store segments in Buckets: + * + * Unlike SegmentsCostCache updates are fast, and we can make do without buckets ideally. + * Unfortunately, large values for (time - bucketStart) cause overflows + * A threshold (say 1Y) is used for bucketing, and this allows us to compute cost within O(logN) for a bucket + * + * If the interval duration exeeds it, we have to use CostBalancerStrategy#intervalCost over all the intervals + * This scenario has a complexity of O(M) where M is the number of "adhoc" buckets. + */ +public class SegmentsCostCacheV3 +{ + /** + * HALF_LIFE_HOURS defines how fast joint cost function tends to 0 as distance between segments' intervals increasing. + * The value of 1 day means that cost function of co-locating two segments which have 1 days between their intervals + * is 0.5 of the cost, if the intervals are adjacent. If the distance is 2 days, then 0.25, etc. + */ + private static final double HALF_LIFE_HOURS = 24.0; + private static final double LAMBDA = Math.log(2) / HALF_LIFE_HOURS; + static final double NORMALIZATION_FACTOR = 1 / (LAMBDA * LAMBDA); + private static final double MILLIS_FACTOR = TimeUnit.HOURS.toMillis(1) / LAMBDA; + + /** + * LIFE_THRESHOLD is used to avoid calculations for segments that are "far" + * from each other and thus cost ~ 0 for these segments + */ + private static final long LIFE_THRESHOLD = TimeUnit.DAYS.toMillis(30); + + // The max interval that can be added to a bucket + private static final long INTERVAL_THRESHOLD = TimeUnit.DAYS.toMillis(366); + // exp(BUCKET_INTERVAL + INTERVAL_THRESHOLD + 2 * LIFE_THRESHOLD) must be within limits + private static final long BUCKET_INTERVAL = TimeUnit.DAYS.toMillis(90); + private static final DurationGranularity BUCKET_GRANULARITY = new DurationGranularity(BUCKET_INTERVAL, 0); + + private static final Comparator BUCKET_INTERVAL_COMPARATOR = + Comparator.comparing(Bucket::getInterval, Comparators.intervalsByStartThenEnd()); + + private static final Ordering BUCKET_ORDERING = Ordering.from(BUCKET_INTERVAL_COMPARATOR); + + private final ArrayList sortedBuckets; + private final ArrayList intervals; + private final ArrayList> adhocNormalizedIntervals; + + private final int allGranularitySegmentCount; + private double allGranularitySegmentCost = -1; + + SegmentsCostCacheV3(ArrayList sortedBuckets, + ArrayList> adhocNormalizedIntervals, + int allGranularitySegmentCount) + { + this.sortedBuckets = Preconditions.checkNotNull(sortedBuckets, "buckets should not be null"); + Preconditions.checkArgument( + BUCKET_ORDERING.isOrdered(sortedBuckets), + "buckets must be ordered by interval" + ); + this.intervals = sortedBuckets.stream().map(Bucket::getInterval).collect(Collectors.toCollection(ArrayList::new)); + this.adhocNormalizedIntervals = Preconditions.checkNotNull(adhocNormalizedIntervals, "adhocIntervals should not be null"); + this.allGranularitySegmentCount = allGranularitySegmentCount; + } + + public double cost(DataSegment segment) + { + boolean allGranularity = isAllGranularity(segment); + if (allGranularity && allGranularitySegmentCost >= 0) { + return allGranularitySegmentCost; + } + + double cost = 0.0; + int index = Collections.binarySearch(intervals, segment.getInterval(), Comparators.intervalsByStartThenEnd()); + index = (index >= 0) ? index : -index - 1; + + for (ListIterator it = sortedBuckets.listIterator(index); it.hasNext(); ) { + Bucket bucket = it.next(); + if (!bucket.inCalculationInterval(segment)) { + break; + } + // O(logN) -> N segments per bucket + cost += bucket.cost(segment); + } + + for (ListIterator it = sortedBuckets.listIterator(index); it.hasPrevious(); ) { + Bucket bucket = it.previous(); + if (!bucket.inCalculationInterval(segment)) { + break; + } + // O(logN) -> N segments per bucket + cost += bucket.cost(segment); + } + + double start = segment.getInterval().getStartMillis() / MILLIS_FACTOR; + double end = segment.getInterval().getEndMillis() / MILLIS_FACTOR; + + // O(1) -> for ALL granularity segments + double allStart = JodaUtils.MIN_INSTANT / MILLIS_FACTOR; + double allEnd = JodaUtils.MAX_INSTANT / MILLIS_FACTOR; + cost += allGranularitySegmentCount * CostBalancerStrategy.intervalCost(allEnd - allStart, start - allStart, end - allStart); + + // O(M) -> M adhoc buckets + for (Pair adhoc : adhocNormalizedIntervals) { + cost += CostBalancerStrategy.intervalCost(adhoc.rhs - adhoc.lhs, start - adhoc.lhs, end - adhoc.lhs); + } + + cost *= NORMALIZATION_FACTOR; + + // store cost for all granularity adhoc bucket for faster computation + if (allGranularity) { + allGranularitySegmentCost = cost; + } + + return cost; + } + + public static Builder builder() + { + return new Builder(); + } + + private static boolean isAllGranularity(DataSegment segment) + { + return segment.getInterval().getStartMillis() == JodaUtils.MIN_INSTANT + && segment.getInterval().getEndMillis() == JodaUtils.MAX_INSTANT; + } + + + public static class Builder + { + private final NavigableMap buckets = new TreeMap<>(Comparators.intervalsByStartThenEnd()); + + private final HashSet allGranularitySegments = new HashSet<>(); + private final HashSet adhocSegments = new HashSet<>(); + + public Builder addSegment(DataSegment segment) + { + if (isAllGranularity(segment)) { + if (!allGranularitySegments.add(segment.getId())) { + throw new ISE("expect new segment"); + } + } else if (isAdhoc(segment)) { + if (!adhocSegments.add(segment.getId())) { + throw new ISE("expect new segment"); + } + } else { + Bucket.Builder builder = buckets.computeIfAbsent(getBucketInterval(segment), Bucket::builder); + builder.addSegment(segment); + } + return this; + } + + public Builder removeSegment(DataSegment segment) + { + if (isAllGranularity(segment)) { + allGranularitySegments.remove(segment.getId()); + } + if (isAdhoc(segment)) { + adhocSegments.remove(segment.getId()); + } else { + Interval interval = getBucketInterval(segment); + buckets.computeIfPresent( + interval, + // If there are no move segments, returning null in computeIfPresent() removes the interval from the buckets + // map + (i, builder) -> builder.removeSegment(segment).isEmpty() ? null : builder + ); + } + return this; + } + + public boolean isEmpty() + { + return buckets.isEmpty() && allGranularitySegments.isEmpty() && adhocSegments.isEmpty(); + } + + public SegmentsCostCacheV3 build() + { + final int allGranularitySegmentCount = allGranularitySegments.size(); + + final ArrayList> adhocNormalizedIntervals = new ArrayList<>(); + for (SegmentId segment : adhocSegments) { + double normalizedStart = segment.getInterval().getStartMillis() / MILLIS_FACTOR; + double normalizedEnd = segment.getInterval().getEndMillis() / MILLIS_FACTOR; + adhocNormalizedIntervals.add(Pair.of(normalizedStart, normalizedEnd)); + } + + return new SegmentsCostCacheV3( + buckets + .values() + .stream() + .map(Bucket.Builder::build) + .collect(Collectors.toCollection(ArrayList::new)), + adhocNormalizedIntervals, + allGranularitySegmentCount + ); + } + + private static Interval getBucketInterval(DataSegment segment) + { + return BUCKET_GRANULARITY.bucket(segment.getInterval().getStart()); + } + + private boolean isAdhoc(DataSegment segment) + { + double duration = segment.getInterval().getEndMillis() / MILLIS_FACTOR + - segment.getInterval().getStartMillis() / MILLIS_FACTOR; + return duration > INTERVAL_THRESHOLD / MILLIS_FACTOR; + } + } + + static class Bucket + { + private final Interval interval; + private final Interval calculationInterval; + + private final long START; + private final long END; + private final double END_VAL; + private final double END_EXP; + private final double END_EXP_INV; + + private final long[] start; + private final long[] end; + + private final double[] startValSum; + private final double[] startExpSum; + private final double[] startExpInvSum; + + private final double[] endValSum; + private final double[] endExpSum; + private final double[] endExpInvSum; + + Bucket(Interval interval, List> intervals) + { + this.interval = Preconditions.checkNotNull(interval, "interval"); + + this.calculationInterval = new Interval( + interval.getStart().minus(LIFE_THRESHOLD), + interval.getEnd().plus(LIFE_THRESHOLD) + ); + + int n = intervals.size(); + start = new long[n]; + end = new long[n]; + for (int i = 0; i < n; i++) { + start[i] = intervals.get(i).lhs; + end[i] = intervals.get(i).rhs; + } + Arrays.sort(start); + Arrays.sort(end); + + START = Math.max(interval.getStartMillis(), start[0]); + END = Math.min(interval.getEndMillis(), end[n - 1]); + + END_VAL = getVal(END); + END_EXP = FastMath.exp(END_VAL); + END_EXP_INV = FastMath.exp(-END_VAL); + + startValSum = new double[n + 1]; + startExpSum = new double[n + 1]; + startExpInvSum = new double[n + 1]; + for (int i = 0; i < n; i++) { + double startVal = getVal(start[i]); + startValSum[i + 1] = startValSum[i] + startVal; + startExpSum[i + 1] = startExpSum[i] + FastMath.exp(startVal); + startExpInvSum[i + 1] = startExpInvSum[i] + FastMath.exp(-startVal); + } + + endValSum = new double[n + 1]; + endExpSum = new double[n + 1]; + endExpInvSum = new double[n + 1]; + for (int i = 0; i < n; i++) { + double endVal = getVal(end[i]); + endValSum[i + 1] = endValSum[i] + endVal; + endExpSum[i + 1] = endExpSum[i] + FastMath.exp(endVal); + endExpInvSum[i + 1] = endExpInvSum[i] + FastMath.exp(-endVal); + } + } + + Interval getInterval() + { + return interval; + } + + boolean inCalculationInterval(DataSegment dataSegment) + { + return calculationInterval.overlaps(dataSegment.getInterval()); + } + + double cost(DataSegment dataSegment) + { + // avoid calculation for segments outside of LIFE_THRESHOLD + if (!inCalculationInterval(dataSegment)) { + throw new ISE("Segment is not within calculation interval"); + } + + // The following bounds help avoid overflow. The cost beyond LIFE_THRESHOLD is insignificant anyway + long x = Math.max(dataSegment.getInterval().getStartMillis(), START - LIFE_THRESHOLD); + long y = Math.min(dataSegment.getInterval().getEndMillis(), END + LIFE_THRESHOLD); + double cost = 0; + + cost += solve(x, y, start, startValSum, startExpSum, startExpInvSum); + cost -= solve(x, y, end, endValSum, endExpSum, endExpInvSum); + + return cost; + } + + // Sum of cost (, ) for all val in vals + private double solve(long x, long y, long[] vals, double[] sum, double[] expSum, double[] expInvSum) + { + + int n = vals.length; + + double xVal = getVal(x); + double xExp = FastMath.exp(xVal); + double xExpInv = FastMath.exp(-xVal); + + double yVal = getVal(y); + double yExp = FastMath.exp(yVal); + double yExpInv = FastMath.exp(-yVal); + + double cost = 0; + + if (END < x) { + + // val , END , x , y + cost += expSum[n] * yExpInv; + cost -= expSum[n] * xExpInv; + cost += n * END_EXP * xExpInv; + cost -= n * END_EXP * yExpInv; + + } else if (END > y) { + + int l = lowerBound(0, n - 1, x, vals); + int r = upperBound(0, n - 1, y, vals); + + // val < j , y , E + cost += 2 * (l + 1) * yVal; + cost -= 2 * (l + 1) * xVal; + cost += expSum[l + 1] * yExpInv; + cost -= expSum[l + 1] * xExpInv; + cost += (l + 1) * xExp * END_EXP_INV; + cost -= (l + 1) * yExp * END_EXP_INV; + + // x <= val <= y , E + cost += 2 * (r - l - 1) * yVal; + cost -= 2 * (sum[r] - sum[l + 1]); + cost += (r - l - 1) * xExp * END_EXP_INV; + cost -= xExp * (expInvSum[r] - expInvSum[l + 1]); + cost -= (r - l - 1) * yExp * END_EXP_INV; + cost += (expSum[r] - expSum[l + 1]) * yExpInv; + + // x , y < val , E + cost += (n - r) * xExp * END_EXP_INV; + cost -= xExp * (expInvSum[n] - expInvSum[r]); + cost -= (n - r) * yExp * END_EXP_INV; + cost += yExp * (expInvSum[n] - expInvSum[r]); + + } else { + + int l = lowerBound(0, n - 1, x, vals); + + // val < x , END , y + cost += 2 * (l + 1) * END_VAL; + cost -= 2 * (l + 1) * xVal; + cost += expSum[l + 1] * yExpInv; + cost -= expSum[l + 1] * xExpInv; + cost -= (l + 1) * END_EXP * yExpInv; + cost += (l + 1) * xExp * END_EXP_INV; + + // x <= val , END , y + cost += 2 * (n - l - 1) * END_VAL; + cost -= 2 * (sum[n] - sum[l + 1]); + cost += (n - l - 1) * xExp * END_EXP_INV; + cost -= xExp * (expInvSum[n] - expInvSum[l + 1]); + cost += (expSum[n] - expSum[l + 1]) * yExpInv; + cost -= (n - l - 1) * END_EXP * yExpInv; + } + + return cost; + } + + private double getVal(long millis) + { + return millis / MILLIS_FACTOR - START / MILLIS_FACTOR; + } + + private int lowerBound(int l, int r, long x, long[] a) + { + if (l == r) { + return a[l] < x ? r : l - 1; + } + int m = (l + r + 1) / 2; + if (a[m] < x) { + return lowerBound(m, r, x, a); + } else { + return lowerBound(l, m - 1, x, a); + } + } + + private int upperBound(int l, int r, long x, long[] a) + { + if (l == r) { + return a[r] > x ? l : r + 1; + } + int m = (l + r) / 2; + if (a[m] > x) { + return upperBound(l, m, x, a); + } else { + return upperBound(m + 1, r, x, a); + } + } + + public static Builder builder(Interval interval) + { + return new Builder(interval); + } + + static class Builder + { + protected final Interval interval; + private final Set segmentSet = new HashSet<>(); + + public Builder(Interval interval) + { + this.interval = interval; + } + + public Builder addSegment(DataSegment dataSegment) + { + if (!interval.contains(dataSegment.getInterval().getStartMillis())) { + throw new ISE("Failed to add segment to bucket: interval is not covered by this bucket"); + } + + if (!segmentSet.add(dataSegment.getId())) { + throw new ISE("expect new segment"); + } + + return this; + } + + public Builder removeSegment(DataSegment dataSegment) + { + segmentSet.remove(dataSegment.getId()); + + return this; + } + + public boolean isEmpty() + { + return segmentSet.isEmpty(); + } + + public Bucket build() + { + long bucketEndMillis = interval.getEndMillis(); + + List> intervals = new ArrayList<>(); + + for (SegmentId segment : segmentSet) { + Interval i = segment.getInterval(); + intervals.add(Pair.of(i.getStartMillis(), i.getEndMillis())); + bucketEndMillis = Math.max(bucketEndMillis, i.getEndMillis()); + } + + return new Bucket(Intervals.utc(interval.getStartMillis(), bucketEndMillis), intervals); + } + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheTest.java b/server/src/test/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheTest.java index f0ae22094fe4..c24efbb02198 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheTest.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator.cost; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.server.coordinator.CostBalancerStrategy; import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -33,25 +34,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import static org.apache.druid.server.coordinator.cost.SegmentsCostCache.NORMALIZATION_FACTOR; + public class SegmentsCostCacheTest { + private static final Random RANDOM = new Random(23894); private static final String DATA_SOURCE = "dataSource"; - private static final DateTime REFERENCE_TIME = DateTimes.of("2014-01-01T00:00:00"); - private static final double EPSILON = 0.00000001; - - @Test - public void segmentCacheTest() - { - SegmentsCostCache.Builder cacheBuilder = SegmentsCostCache.builder(); - cacheBuilder.addSegment(createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100)); - SegmentsCostCache cache = cacheBuilder.build(); - Assert.assertEquals( - 7.8735899489011E-4, - cache.cost(createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, -2), 100)), - EPSILON - ); - } + private static DateTime REFERENCE_TIME = DateTimes.of("2014-01-01T00:00:00"); + private static final double EPSILON = 0.0000001; @Test public void notInCalculationIntervalCostTest() @@ -146,32 +137,169 @@ public void multipleSegmentsCostTest() } @Test - public void randomSegmentsCostTest() + public void perfComparisonTest() { + final int n = 100000; + List dataSegments = new ArrayList<>(1000); - Random random = new Random(1); - for (int i = 0; i < 1000; ++i) { - dataSegments.add(createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, random.nextInt(20)), 100)); + for (int i = 0; i < n; ++i) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, 24 * RANDOM.nextInt(60)), 100)); + } + + DataSegment referenceSegment = createSegment("ANOTHER_DATA_SOURCE", shiftedRandomInterval(REFERENCE_TIME, 5), 100); + + SegmentsCostCache.Builder prototype = new SegmentsCostCache.Builder(); + + long start; + long end; + + start = System.currentTimeMillis(); + + dataSegments.forEach(prototype::addSegment); + SegmentsCostCache cache = prototype.build(); + + end = System.currentTimeMillis(); + System.out.println("Insertion time for " + n + " segments: " + (end - start) + " ms"); + + start = System.currentTimeMillis(); + + double origCost = 0; + for (DataSegment segment : dataSegments) { + origCost += CostBalancerStrategy.computeJointSegmentsCost(segment, referenceSegment); + } + + end = System.currentTimeMillis(); + System.out.println("Avg cost time: " + ((end - start) * 1000) + " us"); + + start = System.currentTimeMillis(); + + for (int i = 0; i < 1000; i++) { + cache.cost(referenceSegment); + } + + end = System.currentTimeMillis(); + System.out.println("Avg cache cost time: " + (end - start) + " us"); + + double cost = cache.cost(referenceSegment); + + Assert.assertEquals(1, origCost / cost, EPSILON); + } + + @Test + public void bucketCorrectnessTest() + { + List dataSegments = new ArrayList<>(); + + // Same as reference interval + for (int i = 0; i < 100; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(20), 10), 100)); + } + + // Overlapping intervals of larger size that enclose the reference interval + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(40) - 70, 100), 100)); + } + + // intervals of small size that are enclosed within the reference interval + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(40) - 20, 1), 100)); + } + + // intervals not intersecting, lying to its left + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, -90), 100)); + } + + // intervals not intersecting, lying to its right + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, 60), 100)); } DataSegment referenceSegment = createSegment("ANOTHER_DATA_SOURCE", shifted1HInterval(REFERENCE_TIME, 5), 100); SegmentsCostCache.Bucket.Builder prototype = SegmentsCostCache.Bucket.builder(new Interval( - REFERENCE_TIME.minusHours(1), - REFERENCE_TIME.plusHours(25) + REFERENCE_TIME.minusHours(90), REFERENCE_TIME.plusHours(90) )); + dataSegments.forEach(prototype::addSegment); SegmentsCostCache.Bucket bucket = prototype.build(); + double origCost = 0; + for (DataSegment segment : dataSegments) { + origCost += CostBalancerStrategy.computeJointSegmentsCost(segment, referenceSegment); + } + double cost = bucket.cost(referenceSegment); - Assert.assertEquals(0.7065117101966677, cost, EPSILON); + + Assert.assertEquals(NORMALIZATION_FACTOR, origCost / cost, EPSILON); + } + + @Test + public void overallCorrectnessTest() + { + List dataSegments = new ArrayList<>(); + + // Same as reference interval + for (int i = 0; i < 100; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(20), 10), 100)); + } + + // Overlapping intervals of larger size that enclose the reference interval + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(40) - 70, 100), 100)); + } + + // intervals of small size that are enclosed within the reference interval + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(40) - 20, 1), 100)); + } + + // intervals not intersecting, lying to its left + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, -90), 100)); + } + + // intervals not intersecting, lying to its right + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, 60), 100)); + } + + DataSegment referenceSegment = createSegment("ANOTHER_DATA_SOURCE", shifted1HInterval(REFERENCE_TIME, 5), 100); + + SegmentsCostCache.Builder prototype = new SegmentsCostCache.Builder(); + + dataSegments.forEach(prototype::addSegment); + SegmentsCostCache cache = prototype.build(); + + double origCost = 0; + for (DataSegment segment : dataSegments) { + origCost += CostBalancerStrategy.computeJointSegmentsCost(segment, referenceSegment); + } + + double cost = cache.cost(referenceSegment); + + Assert.assertEquals(1, origCost / cost, EPSILON); + } + + + private static Interval shiftedXHInterval(DateTime REFERENCE_TIME, int shiftInHours, int X) + { + return new Interval( + REFERENCE_TIME.plusHours(shiftInHours), + REFERENCE_TIME.plusHours(shiftInHours + X) + ); } private static Interval shifted1HInterval(DateTime REFERENCE_TIME, int shiftInHours) + { + return shiftedXHInterval(REFERENCE_TIME, shiftInHours, 1); + } + + private static Interval shiftedRandomInterval(DateTime REFERENCE_TIME, int shiftInHours) { return new Interval( REFERENCE_TIME.plusHours(shiftInHours), - REFERENCE_TIME.plusHours(shiftInHours + 1) + REFERENCE_TIME.plusHours(shiftInHours + RANDOM.nextInt(1000)) ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheV3Test.java b/server/src/test/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheV3Test.java new file mode 100644 index 000000000000..3e7f847bcec2 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/cost/SegmentsCostCacheV3Test.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.cost; + +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.server.coordinator.CostBalancerStrategy; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.druid.server.coordinator.cost.SegmentsCostCacheV3.NORMALIZATION_FACTOR; + +public class SegmentsCostCacheV3Test +{ + + private static final Random RANDOM = new Random(23894); + private static final String DATA_SOURCE = "dataSource"; + private static final DateTime REFERENCE_TIME = DateTimes.of("2014-01-01T00:00:00"); + private static final double EPSILON = 0.0000001; + + @Test + public void notInCalculationIntervalCostTest() + { + SegmentsCostCacheV3.Builder cacheBuilder = SegmentsCostCacheV3.builder(); + cacheBuilder.addSegment( + createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100) + ); + SegmentsCostCacheV3 cache = cacheBuilder.build(); + Assert.assertEquals( + 0, + cache.cost(createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, (int) TimeUnit.DAYS.toHours(50)), 100)), + EPSILON + ); + } + + @Test + public void twoSegmentsCostTest() + { + DataSegment segmentA = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + DataSegment segmentB = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, -2), 100); + + SegmentsCostCacheV3.Bucket.Builder prototype = SegmentsCostCacheV3.Bucket.builder(new Interval( + REFERENCE_TIME.minusHours(5), + REFERENCE_TIME.plusHours(5) + )); + + prototype.addSegment(segmentA); + SegmentsCostCacheV3.Bucket bucket = prototype.build(); + + double segmentCost = bucket.cost(segmentB); + Assert.assertEquals(7.8735899489011E-4, segmentCost, EPSILON); + } + + @Test + public void calculationIntervalTest() + { + DataSegment segmentA = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + DataSegment segmentB = createSegment( + DATA_SOURCE, + shifted1HInterval(REFERENCE_TIME, (int) TimeUnit.DAYS.toHours(50)), + 100 + ); + + SegmentsCostCacheV3.Bucket.Builder prototype = SegmentsCostCacheV3.Bucket.builder( + new Interval(REFERENCE_TIME.minusHours(5), REFERENCE_TIME.plusHours(5)) + ); + prototype.addSegment(segmentA); + SegmentsCostCacheV3.Bucket bucket = prototype.build(); + + Assert.assertTrue(bucket.inCalculationInterval(segmentA)); + Assert.assertFalse(bucket.inCalculationInterval(segmentB)); + } + + @Test + public void sameSegmentCostTest() + { + DataSegment segmentA = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + DataSegment segmentB = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + + SegmentsCostCacheV3.Bucket.Builder prototype = SegmentsCostCacheV3.Bucket.builder(new Interval( + REFERENCE_TIME.minusHours(5), + REFERENCE_TIME.plusHours(5) + )); + + prototype.addSegment(segmentA); + SegmentsCostCacheV3.Bucket bucket = prototype.build(); + + double segmentCost = bucket.cost(segmentB); + Assert.assertEquals(8.26147353873985E-4, segmentCost, EPSILON); + } + + @Test + public void multipleSegmentsCostTest() + { + DataSegment segmentA = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, -2), 100); + DataSegment segmentB = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + DataSegment segmentC = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 2), 100); + + SegmentsCostCacheV3.Bucket.Builder prototype = SegmentsCostCacheV3.Bucket.builder(new Interval( + REFERENCE_TIME.minusHours(5), + REFERENCE_TIME.plusHours(5) + )); + + prototype.addSegment(segmentA); + prototype.addSegment(segmentC); + SegmentsCostCacheV3.Bucket bucket = prototype.build(); + + double segmentCost = bucket.cost(segmentB); + + Assert.assertEquals(0.001574717989780039, segmentCost, EPSILON); + } + + @Test + public void perfComparisonTest() + { + final int n = 100000; + + List dataSegments = new ArrayList<>(1000); + for (int i = 0; i < n; ++i) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, 24 * RANDOM.nextInt(60)), 100)); + } + + DataSegment referenceSegment = createSegment("ANOTHER_DATA_SOURCE", shiftedRandomInterval(REFERENCE_TIME, 5), 100); + + SegmentsCostCacheV3.Builder prototype = new SegmentsCostCacheV3.Builder(); + + long start; + long end; + + start = System.currentTimeMillis(); + dataSegments.forEach(prototype::addSegment); + SegmentsCostCacheV3 cache = prototype.build(); + end = System.currentTimeMillis(); + System.out.println("Insertion time for " + n + " segments: " + (end - start) + " ms"); + + start = System.currentTimeMillis(); + for (int i = 0; i < 1000; i++) { + getExpectedCost(dataSegments, referenceSegment); + } + end = System.currentTimeMillis(); + System.out.println("Avg cost time: " + (end - start) + " us"); + + start = System.currentTimeMillis(); + for (int i = 0; i < 1000; i++) { + cache.cost(referenceSegment); + } + end = System.currentTimeMillis(); + System.out.println("Avg new cache cost time: " + (end - start) + " us"); + + double expectedCost = getExpectedCost(dataSegments, referenceSegment); + double cost = cache.cost(referenceSegment); + + Assert.assertEquals(1, expectedCost / cost, EPSILON); + } + + @Test + public void bucketCorrectnessTest() + { + List dataSegments = new ArrayList<>(); + + // Same as reference interval + for (int i = 0; i < 100; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(20), 10), 100)); + } + + // Overlapping intervals of larger size that enclose the reference interval + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(40) - 70, 100), 100)); + } + + // intervals of small size that are enclosed within the reference interval + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(40) - 20, 1), 100)); + } + + // intervals not intersecting, lying to its left + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, -90), 100)); + } + + // intervals not intersecting, lying to its right + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, 60), 100)); + } + + DataSegment referenceSegment = createSegment("ANOTHER_DATA_SOURCE", shifted1HInterval(REFERENCE_TIME, 5), 100); + + SegmentsCostCacheV3.Bucket.Builder prototype = SegmentsCostCacheV3.Bucket.builder(new Interval( + REFERENCE_TIME.minusHours(90), REFERENCE_TIME.plusHours(90) + )); + + dataSegments.forEach(prototype::addSegment); + SegmentsCostCacheV3.Bucket bucket = prototype.build(); + + double expectedCost = getExpectedCost(dataSegments, referenceSegment); + + double cost = bucket.cost(referenceSegment); + + Assert.assertEquals(NORMALIZATION_FACTOR, expectedCost / cost, EPSILON); + } + + @Test + public void overallCorrectnessTest() + { + List dataSegments = new ArrayList<>(); + + // Same as reference interval + for (int i = 0; i < 100; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(20), 10), 100)); + } + + // Overlapping intervals of larger size that enclose the reference interval + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(40) - 70, 100), 100)); + } + + // intervals of small size that are enclosed within the reference interval + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, RANDOM.nextInt(40) - 20, 1), 100)); + } + + // intervals not intersecting, lying to its left + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, -90), 100)); + } + + // intervals not intersecting, lying to its right + for (int i = 0; i < 10; i++) { + dataSegments.add(createSegment(DATA_SOURCE, shiftedRandomInterval(REFERENCE_TIME, 60), 100)); + } + + DataSegment referenceSegment = createSegment("ANOTHER_DATA_SOURCE", shifted1HInterval(REFERENCE_TIME, 5), 100); + + SegmentsCostCacheV3.Builder prototype = new SegmentsCostCacheV3.Builder(); + + double expectedCost = getExpectedCost(dataSegments, referenceSegment); + + dataSegments.forEach(prototype::addSegment); + SegmentsCostCacheV3 cache = prototype.build(); + double cost = cache.cost(referenceSegment); + + Assert.assertEquals(1, expectedCost / cost, EPSILON); + } + + @Test + public void testLargeIntervals() + { + List intervals = new ArrayList<>(); + // add ALL granularity buckets + for (int i = 0; i < 5; i++) { + intervals.add(new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT)); + } + // add random large intervals + for (int i = 0; i < 15; i++) { + intervals.add(new Interval(REFERENCE_TIME.minusYears(RANDOM.nextInt(30)), + REFERENCE_TIME.plusYears(RANDOM.nextInt(30)))); + } + // add random medium intervals + for (int i = 0; i < 30; i++) { + intervals.add(new Interval(REFERENCE_TIME.minusWeeks(RANDOM.nextInt(30)), + REFERENCE_TIME.plusWeeks(RANDOM.nextInt(30)))); + } + // add random small intervals + for (int i = 0; i < 50; i++) { + intervals.add(new Interval(REFERENCE_TIME.minusHours(RANDOM.nextInt(30)), + REFERENCE_TIME.plusHours(RANDOM.nextInt(30)))); + } + + List segments = intervals.stream() + .map(interval -> createSegment(DATA_SOURCE, interval, 100)) + .collect(Collectors.toList()); + List referenceSegments = intervals.stream() + .map(interval -> createSegment("ANOTHER_DATA_SOURCE", interval, 100)) + .collect(Collectors.toList()); + + for (DataSegment segment : segments) { + for (DataSegment referenceSegment : referenceSegments) { + SegmentsCostCacheV3.Builder builder = SegmentsCostCacheV3.builder(); + builder.addSegment(segment); + SegmentsCostCacheV3 cache = builder.build(); + + double expectedCost = CostBalancerStrategy.computeJointSegmentsCost(segment, referenceSegment); + double cost = cache.cost(referenceSegment); + Assert.assertEquals(1, expectedCost / cost, 0.0001); + } + } + + SegmentsCostCacheV3.Builder builder = SegmentsCostCacheV3.builder(); + segments.forEach(builder::addSegment); + SegmentsCostCacheV3 cache = builder.build(); + for (DataSegment referenceSegment : referenceSegments) { + double expectedCost = getExpectedCost(segments, referenceSegment); + double cost = cache.cost(referenceSegment); + Assert.assertEquals(1, expectedCost / cost, 0.01); + } + } + + // ( ) [ ] + @Test + public void leftOfBucketTest() + { + DataSegment origin = createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, 0, 2), 100); + SegmentsCostCacheV3.Builder builder = SegmentsCostCacheV3.builder(); + builder.addSegment(origin); + SegmentsCostCacheV3 cache = builder.build(); + + DataSegment segment = createSegment("blah", shiftedXHInterval(REFERENCE_TIME, -3, 2), 100); + + double cost = cache.cost(segment); + double expectedCost = CostBalancerStrategy.computeJointSegmentsCost(origin, segment); + Assert.assertEquals(cost, expectedCost, EPSILON); + } + + // ( [ ) ] + @Test + public void leftOverlapWithBucketTest() + { + DataSegment origin = createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, 0, 2), 100); + SegmentsCostCacheV3.Builder builder = SegmentsCostCacheV3.builder(); + builder.addSegment(origin); + SegmentsCostCacheV3 cache = builder.build(); + + DataSegment segment = createSegment("blah", shiftedXHInterval(REFERENCE_TIME, -1, 2), 100); + + double cost = cache.cost(segment); + double expectedCost = CostBalancerStrategy.computeJointSegmentsCost(origin, segment); + Assert.assertEquals(cost, expectedCost, EPSILON); + } + + // ( [ ] ) + @Test + public void enclosedByBucketTest() + { + DataSegment origin = createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, 0, 4), 100); + SegmentsCostCacheV3.Builder builder = SegmentsCostCacheV3.builder(); + builder.addSegment(origin); + SegmentsCostCacheV3 cache = builder.build(); + + DataSegment segment = createSegment("blah", shiftedXHInterval(REFERENCE_TIME, 1, 2), 100); + + double cost = cache.cost(segment); + double expectedCost = CostBalancerStrategy.computeJointSegmentsCost(origin, segment); + Assert.assertEquals(cost, expectedCost, EPSILON); + } + + // [ ( ) ] + @Test + public void enclosesBucketTest() + { + DataSegment origin = createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, 0, 2), 100); + SegmentsCostCacheV3.Builder builder = SegmentsCostCacheV3.builder(); + builder.addSegment(origin); + SegmentsCostCacheV3 cache = builder.build(); + + DataSegment segment = createSegment("blah", shiftedXHInterval(REFERENCE_TIME, -1, 4), 100); + + double cost = cache.cost(segment); + double expectedCost = CostBalancerStrategy.computeJointSegmentsCost(origin, segment); + Assert.assertEquals(cost, expectedCost, EPSILON); + } + + // [ ( ] ) + @Test + public void rightOverlapWithBucketTest() + { + DataSegment origin = createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, 0, 2), 100); + SegmentsCostCacheV3.Builder builder = SegmentsCostCacheV3.builder(); + builder.addSegment(origin); + SegmentsCostCacheV3 cache = builder.build(); + + DataSegment segment = createSegment("blah", shiftedXHInterval(REFERENCE_TIME, 1, 2), 100); + + double cost = cache.cost(segment); + double expectedCost = CostBalancerStrategy.computeJointSegmentsCost(origin, segment); + Assert.assertEquals(cost, expectedCost, EPSILON); + } + + // [ ] ( ) + @Test + public void rightOfBucketTest() + { + DataSegment origin = createSegment(DATA_SOURCE, shiftedXHInterval(REFERENCE_TIME, 0, 2), 100); + SegmentsCostCacheV3.Builder builder = SegmentsCostCacheV3.builder(); + builder.addSegment(origin); + SegmentsCostCacheV3 cache = builder.build(); + + DataSegment segment = createSegment("blah", shiftedXHInterval(REFERENCE_TIME, 3, 2), 100); + + double cost = cache.cost(segment); + double expectedCost = CostBalancerStrategy.computeJointSegmentsCost(origin, segment); + Assert.assertEquals(cost, expectedCost, EPSILON); + } + + private static double getExpectedCost(List segments, DataSegment referenceSegment) + { + double cost = 0; + for (DataSegment segment : segments) { + cost += CostBalancerStrategy.computeJointSegmentsCost(segment, referenceSegment); + } + return cost; + } + + private static Interval shiftedXHInterval(DateTime REFERENCE_TIME, int shiftInHours, int X) + { + return new Interval( + REFERENCE_TIME.plusHours(shiftInHours), + REFERENCE_TIME.plusHours(shiftInHours + X) + ); + } + + private static Interval shifted1HInterval(DateTime REFERENCE_TIME, int shiftInHours) + { + return shiftedXHInterval(REFERENCE_TIME, shiftInHours, 1); + } + + private static Interval shiftedRandomInterval(DateTime REFERENCE_TIME, int shiftInHours) + { + return new Interval( + REFERENCE_TIME.plusHours(shiftInHours), + REFERENCE_TIME.plusHours(shiftInHours + RANDOM.nextInt(100)) + ); + } + + private static DataSegment createSegment(String dataSource, Interval interval, long size) + { + return new DataSegment( + dataSource, + interval, + UUID.randomUUID().toString(), + new ConcurrentHashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + null, + 0, + size + ); + } +}