diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index 4de1e05a5b69..bba3782b8547 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -311,6 +311,11 @@
+
+
+
+
+
diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index d06125ac35df..dfef1283a2a3 100644
--- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -20,6 +20,8 @@
package org.apache.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.helper.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.helper.CompactionSegmentSearchPolicy;
@@ -42,6 +44,7 @@
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -102,14 +105,10 @@ public void setup()
);
}
- dataSources = new HashMap<>();
+ List segments = new ArrayList<>();
for (int i = 0; i < numDataSources; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
- VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(
- String.CASE_INSENSITIVE_ORDER
- );
-
final int startYear = ThreadLocalRandom.current().nextInt(2000, 2040);
DateTime date = DateTimes.of(startYear, 1, 1, 0, 0);
@@ -127,12 +126,11 @@ public void setup()
0,
segmentSizeBytes
);
- timeline.add(segment.getInterval(), segment.getVersion(), shardSpec.createChunk(segment));
+ segments.add(segment);
}
}
-
- dataSources.put(dataSource, timeline);
}
+ dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()).getUsedSegmentsTimelinesPerDataSource();
}
@Benchmark
diff --git a/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java b/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java
index d5aa17ada9c5..569537a18a88 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java
@@ -123,6 +123,11 @@ public void error(Throwable t, String message, Object... formatArgs)
log.error(StringUtils.nonStrictFormat(message, formatArgs), t);
}
+ public void assertionError(String message, Object... formatArgs)
+ {
+ log.error("ASSERTION_ERROR: " + message, formatArgs);
+ }
+
public void wtf(String message, Object... formatArgs)
{
log.error(StringUtils.nonStrictFormat("WTF?!: " + message, formatArgs), new Exception());
diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java b/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
index 8e685c696117..ef5d47a1f4b5 100644
--- a/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
@@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.service.AlertBuilder;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -61,15 +62,22 @@ public AlertBuilder makeAlert(String message, Object... objects)
return makeAlert(null, message, objects);
}
- public AlertBuilder makeAlert(Throwable t, String message, Object... objects)
+ public AlertBuilder makeAlert(@Nullable Throwable t, String message, Object... objects)
{
if (emitter == null) {
final String errorMessage = StringUtils.format(
- "Emitter not initialized! Cannot alert. Please make sure to call %s.registerEmitter()", this.getClass()
+ "Emitter not initialized! Cannot alert. Please make sure to call %s.registerEmitter()\n"
+ + "Message: %s",
+ this.getClass(),
+ StringUtils.nonStrictFormat(message, objects)
);
error(errorMessage);
- throw new ISE(errorMessage);
+ ISE e = new ISE(errorMessage);
+ if (t != null) {
+ e.addSuppressed(t);
+ }
+ throw e;
}
final AlertBuilder retVal = new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter)
diff --git a/core/src/main/java/org/apache/druid/timeline/TimelineLookup.java b/core/src/main/java/org/apache/druid/timeline/TimelineLookup.java
index af7544083756..6bdab5c723de 100644
--- a/core/src/main/java/org/apache/druid/timeline/TimelineLookup.java
+++ b/core/src/main/java/org/apache/druid/timeline/TimelineLookup.java
@@ -22,6 +22,7 @@
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.List;
@@ -50,5 +51,5 @@ public interface TimelineLookup
*/
List> lookupWithIncompletePartitions(Interval interval);
- PartitionHolder findEntry(Interval interval, VersionType version);
+ @Nullable PartitionHolder findEntry(Interval interval, VersionType version);
}
diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java
index 36c177dfb0f0..8deed36fd7e6 100644
--- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java
+++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java
@@ -23,16 +23,18 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
-import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.timeline.partition.ImmutablePartitionHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -44,7 +46,9 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.StreamSupport;
/**
* VersionedIntervalTimeline is a data structure that manages objects on a specific timeline.
@@ -75,12 +79,11 @@ public class VersionedIntervalTimeline implements Timel
Comparators.intervalsByStartThenEnd()
);
private final Map> allTimelineEntries = new HashMap<>();
+ private final AtomicInteger numObjects = new AtomicInteger();
private final Comparator super VersionType> versionComparator;
- public VersionedIntervalTimeline(
- Comparator super VersionType> versionComparator
- )
+ public VersionedIntervalTimeline(Comparator super VersionType> versionComparator)
{
this.versionComparator = versionComparator;
}
@@ -92,7 +95,8 @@ public static VersionedIntervalTimeline forSegments(Iterabl
public static VersionedIntervalTimeline forSegments(Iterator segments)
{
- final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural());
+ final VersionedIntervalTimeline timeline =
+ new VersionedIntervalTimeline<>(Comparator.naturalOrder());
addSegments(timeline, segments);
return timeline;
}
@@ -115,6 +119,28 @@ public Map> getAllTimelineEntries(
return allTimelineEntries;
}
+ /**
+ * Returns a lazy collection with all objects (including overshadowed, see {@link #findOvershadowed}) in this
+ * VersionedIntervalTimeline to be used for iteration or {@link Collection#stream()} transformation. The order of
+ * objects in this collection is unspecified.
+ *
+ * Note: iteration over the returned collection may not be as trivially cheap as, for example, iteration over an
+ * ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned collection only
+ * once rather than several times.
+ */
+ public Collection iterateAllObjects()
+ {
+ return CollectionUtils.createLazyCollectionFromStream(
+ () -> allTimelineEntries
+ .values()
+ .stream()
+ .flatMap((TreeMap entryMap) -> entryMap.values().stream())
+ .flatMap((TimelineEntry entry) -> StreamSupport.stream(entry.getPartitionHolder().spliterator(), false))
+ .map(PartitionChunk::getObject),
+ numObjects.get()
+ );
+ }
+
public void add(final Interval interval, VersionType version, PartitionChunk object)
{
addAll(Iterators.singletonIterator(object), o -> interval, o -> version);
@@ -143,15 +169,19 @@ private void addAll(
TreeMap versionEntry = new TreeMap<>(versionComparator);
versionEntry.put(version, entry);
allTimelineEntries.put(interval, versionEntry);
+ numObjects.incrementAndGet();
} else {
entry = exists.get(version);
if (entry == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<>(object));
exists.put(version, entry);
+ numObjects.incrementAndGet();
} else {
PartitionHolder partitionHolder = entry.getPartitionHolder();
- partitionHolder.add(object);
+ if (partitionHolder.add(object)) {
+ numObjects.incrementAndGet();
+ }
}
}
@@ -174,6 +204,7 @@ private void addAll(
}
}
+ @Nullable
public PartitionChunk remove(Interval interval, VersionType version, PartitionChunk chunk)
{
try {
@@ -189,7 +220,11 @@ public PartitionChunk remove(Interval interval, VersionType version,
return null;
}
- PartitionChunk retVal = entry.getPartitionHolder().remove(chunk);
+ PartitionChunk removedChunk = entry.getPartitionHolder().remove(chunk);
+ if (removedChunk == null) {
+ return null;
+ }
+ numObjects.decrementAndGet();
if (entry.getPartitionHolder().isEmpty()) {
versionEntries.remove(version);
if (versionEntries.isEmpty()) {
@@ -201,7 +236,7 @@ public PartitionChunk remove(Interval interval, VersionType version,
remove(completePartitionsTimeline, interval, entry, false);
- return retVal;
+ return removedChunk;
}
finally {
lock.writeLock().unlock();
@@ -209,7 +244,7 @@ public PartitionChunk remove(Interval interval, VersionType version,
}
@Override
- public PartitionHolder findEntry(Interval interval, VersionType version)
+ public @Nullable PartitionHolder findEntry(Interval interval, VersionType version)
{
try {
lock.readLock().lock();
@@ -217,9 +252,7 @@ public PartitionHolder findEntry(Interval interval, VersionType vers
if (entry.getKey().equals(interval) || entry.getKey().contains(interval)) {
TimelineEntry foundEntry = entry.getValue().get(version);
if (foundEntry != null) {
- return new ImmutablePartitionHolder(
- foundEntry.getPartitionHolder()
- );
+ return new ImmutablePartitionHolder<>(foundEntry.getPartitionHolder());
}
}
}
@@ -307,6 +340,10 @@ private TimelineObjectHolder timelineEntryToObjectHolde
);
}
+ /**
+ * This method should be deduplicated with DataSourcesSnapshot.determineOvershadowedSegments(): see
+ * https://github.com/apache/incubator-druid/issues/8070.
+ */
public Set> findOvershadowed()
{
try {
@@ -315,8 +352,8 @@ public Set> findOvershadowed()
Map> overShadowed = new HashMap<>();
for (Map.Entry> versionEntry : allTimelineEntries.entrySet()) {
- Map versionCopy = new HashMap<>();
- versionCopy.putAll(versionEntry.getValue());
+ @SuppressWarnings("unchecked")
+ Map versionCopy = (TreeMap) versionEntry.getValue().clone();
overShadowed.put(versionEntry.getKey(), versionCopy);
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java b/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java
index ec8f7d891a7e..5003f651c121 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java
@@ -23,7 +23,7 @@
*/
public class ImmutablePartitionHolder extends PartitionHolder
{
- public ImmutablePartitionHolder(PartitionHolder partitionHolder)
+ public ImmutablePartitionHolder(PartitionHolder partitionHolder)
{
super(partitionHolder);
}
@@ -35,7 +35,7 @@ public PartitionChunk remove(PartitionChunk tPartitionChunk)
}
@Override
- public void add(PartitionChunk tPartitionChunk)
+ public boolean add(PartitionChunk tPartitionChunk)
{
throw new UnsupportedOperationException();
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
index 396b4f3bdd63..dcf29aedc488 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
@@ -22,72 +22,62 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
+import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
import java.util.Spliterator;
-import java.util.TreeSet;
+import java.util.TreeMap;
/**
* An object that clumps together multiple other objects which each represent a shard of some space.
*/
public class PartitionHolder implements Iterable>
{
- private final TreeSet> holderSet;
+ private final TreeMap, PartitionChunk> holderMap;
public PartitionHolder(PartitionChunk initialChunk)
{
- this.holderSet = new TreeSet<>();
+ this.holderMap = new TreeMap<>();
add(initialChunk);
}
public PartitionHolder(List> initialChunks)
{
- this.holderSet = new TreeSet<>();
+ this.holderMap = new TreeMap<>();
for (PartitionChunk chunk : initialChunks) {
add(chunk);
}
}
- public PartitionHolder(PartitionHolder partitionHolder)
+ public PartitionHolder(PartitionHolder partitionHolder)
{
- this.holderSet = new TreeSet<>();
- this.holderSet.addAll(partitionHolder.holderSet);
+ this.holderMap = new TreeMap<>();
+ this.holderMap.putAll(partitionHolder.holderMap);
}
- public void add(PartitionChunk chunk)
+ public boolean add(PartitionChunk chunk)
{
- holderSet.add(chunk);
+ return holderMap.putIfAbsent(chunk, chunk) == null;
}
+ @Nullable
public PartitionChunk remove(PartitionChunk chunk)
{
- if (!holderSet.isEmpty()) {
- // Somewhat funky implementation in order to return the removed object as it exists in the set
- SortedSet> tailSet = holderSet.tailSet(chunk, true);
- if (!tailSet.isEmpty()) {
- PartitionChunk element = tailSet.first();
- if (chunk.equals(element)) {
- holderSet.remove(element);
- return element;
- }
- }
- }
- return null;
+ return holderMap.remove(chunk);
}
public boolean isEmpty()
{
- return holderSet.isEmpty();
+ return holderMap.isEmpty();
}
public boolean isComplete()
{
- if (holderSet.isEmpty()) {
+ if (holderMap.isEmpty()) {
return false;
}
- Iterator> iter = holderSet.iterator();
+ Iterator> iter = holderMap.keySet().iterator();
PartitionChunk curr = iter.next();
@@ -117,7 +107,7 @@ public boolean isComplete()
public PartitionChunk getChunk(final int partitionNum)
{
final Iterator> retVal = Iterators.filter(
- holderSet.iterator(),
+ holderMap.keySet().iterator(),
input -> input.getChunkNumber() == partitionNum
);
@@ -127,13 +117,13 @@ public PartitionChunk getChunk(final int partitionNum)
@Override
public Iterator> iterator()
{
- return holderSet.iterator();
+ return holderMap.keySet().iterator();
}
@Override
public Spliterator> spliterator()
{
- return holderSet.spliterator();
+ return holderMap.keySet().spliterator();
}
public Iterable payloads()
@@ -153,7 +143,7 @@ public boolean equals(Object o)
PartitionHolder that = (PartitionHolder) o;
- if (!holderSet.equals(that.holderSet)) {
+ if (!holderMap.equals(that.holderMap)) {
return false;
}
@@ -163,14 +153,14 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
- return holderSet.hashCode();
+ return holderMap.hashCode();
}
@Override
public String toString()
{
return "PartitionHolder{" +
- "holderSet=" + holderSet +
+ "holderMap=" + holderMap +
'}';
}
}
diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
index af3cf077f430..deb4eafd0466 100644
--- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
+++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
@@ -19,13 +19,16 @@
package org.apache.druid.utils;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.util.AbstractCollection;
import java.util.Collection;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
+import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -72,6 +75,13 @@ public int size()
};
}
+ public static TreeSet newTreeSet(Comparator super E> comparator, Iterable elements)
+ {
+ TreeSet set = new TreeSet<>(comparator);
+ Iterables.addAll(set, elements);
+ return set;
+ }
+
/**
* Returns a transformed map from the given input map where the value is modified based on the given valueMapper
* function.
diff --git a/core/src/main/java/org/apache/druid/utils/package-info.java b/core/src/main/java/org/apache/druid/utils/package-info.java
new file mode 100644
index 000000000000..f41e226c8160
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@EverythingIsNonnullByDefault
+package org.apache.druid.utils;
+
+import org.apache.druid.annotations.EverythingIsNonnullByDefault;
diff --git a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
index 681f6a605c5c..3e66bf53de79 100644
--- a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
@@ -240,22 +240,22 @@ public void testRemove()
public void testFindEntry()
{
Assert.assertEquals(
- new ImmutablePartitionHolder(new PartitionHolder(makeSingle(1))),
+ new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle(1))),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-02"), "1")
);
Assert.assertEquals(
- new ImmutablePartitionHolder(new PartitionHolder(makeSingle(1))),
+ new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle(1))),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-01T10"), "1")
);
Assert.assertEquals(
- new ImmutablePartitionHolder(new PartitionHolder(makeSingle(1))),
+ new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle(1))),
timeline.findEntry(Intervals.of("2011-10-01T02/2011-10-02"), "1")
);
Assert.assertEquals(
- new ImmutablePartitionHolder(new PartitionHolder(makeSingle(1))),
+ new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle(1))),
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "1")
);
@@ -279,7 +279,7 @@ public void testFindEntryWithOverlap()
add("2011-01-02/2011-01-05", "2", 1);
Assert.assertEquals(
- new ImmutablePartitionHolder(new PartitionHolder(makeSingle(1))),
+ new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle(1))),
timeline.findEntry(Intervals.of("2011-01-02T02/2011-01-04"), "1")
);
}
@@ -1564,11 +1564,11 @@ public void testIsOvershadowedWithNonOverlappingSegmentsInTimeline()
{
timeline = makeStringIntegerTimeline();
- add("2011-04-05/2011-04-07", "1", new SingleElementPartitionChunk(1));
- add("2011-04-07/2011-04-09", "1", new SingleElementPartitionChunk(1));
+ add("2011-04-05/2011-04-07", "1", new SingleElementPartitionChunk<>(1));
+ add("2011-04-07/2011-04-09", "1", new SingleElementPartitionChunk<>(1));
- add("2011-04-15/2011-04-17", "1", new SingleElementPartitionChunk(1));
- add("2011-04-17/2011-04-19", "1", new SingleElementPartitionChunk(1));
+ add("2011-04-15/2011-04-17", "1", new SingleElementPartitionChunk<>(1));
+ add("2011-04-17/2011-04-19", "1", new SingleElementPartitionChunk<>(1));
Assert.assertFalse(timeline.isOvershadowed(Intervals.of("2011-04-01/2011-04-03"), "0"));
Assert.assertFalse(timeline.isOvershadowed(Intervals.of("2011-04-01/2011-04-05"), "0"));
@@ -1629,11 +1629,11 @@ public void testIsOvershadowedWithOverlappingSegmentsInTimeline()
{
timeline = makeStringIntegerTimeline();
- add("2011-04-05/2011-04-09", "11", new SingleElementPartitionChunk(1));
- add("2011-04-07/2011-04-11", "12", new SingleElementPartitionChunk(1));
+ add("2011-04-05/2011-04-09", "11", new SingleElementPartitionChunk<>(1));
+ add("2011-04-07/2011-04-11", "12", new SingleElementPartitionChunk<>(1));
- add("2011-04-15/2011-04-19", "12", new SingleElementPartitionChunk(1));
- add("2011-04-17/2011-04-21", "11", new SingleElementPartitionChunk(1));
+ add("2011-04-15/2011-04-19", "12", new SingleElementPartitionChunk<>(1));
+ add("2011-04-17/2011-04-21", "11", new SingleElementPartitionChunk<>(1));
Assert.assertFalse(timeline.isOvershadowed(Intervals.of("2011-04-01/2011-04-03"), "0"));
@@ -1730,13 +1730,13 @@ private Pair>> createExpected(
{
return Pair.of(
Intervals.of(intervalString),
- Pair.of(version, new PartitionHolder(values))
+ Pair.of(version, new PartitionHolder<>(values))
);
}
private SingleElementPartitionChunk makeSingle(Integer value)
{
- return new SingleElementPartitionChunk(value);
+ return new SingleElementPartitionChunk<>(value);
}
private void add(String interval, String version, Integer value)
@@ -1808,7 +1808,7 @@ public Pair>> apply(
private VersionedIntervalTimeline makeStringIntegerTimeline()
{
- return new VersionedIntervalTimeline(Ordering.natural());
+ return new VersionedIntervalTimeline<>(Ordering.natural());
}
}
diff --git a/docs/content/ingestion/delete-data.md b/docs/content/ingestion/delete-data.md
index 7e21e99bcc95..181fddb0ec63 100644
--- a/docs/content/ingestion/delete-data.md
+++ b/docs/content/ingestion/delete-data.md
@@ -37,7 +37,8 @@ A data deletion tutorial is available at [Tutorial: Deleting data](../tutorials/
## Kill Task
-Kill tasks delete all information about a segment and removes it from deep storage. Killable segments must be disabled (used==0) in the Druid segment table. The available grammar is:
+Kill tasks delete all information about a segment and removes it from deep storage. Segments to kill must be unused
+(used==0) in the Druid segment table. The available grammar is:
```json
{
diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md
index 3b49b12c5b1d..716d5e39f74d 100644
--- a/docs/content/operations/api-reference.md
+++ b/docs/content/operations/api-reference.md
@@ -113,15 +113,17 @@ Returns the serialized JSON of segments to load and drop for each Historical pro
* `/druid/coordinator/v1/metadata/datasources`
-Returns a list of the names of enabled datasources in the cluster.
+Returns a list of the names of data sources with at least one used segment in the cluster.
-* `/druid/coordinator/v1/metadata/datasources?includeDisabled`
+* `/druid/coordinator/v1/metadata/datasources?includeUnused`
-Returns a list of the names of enabled and disabled datasources in the cluster.
+Returns a list of the names of data sources, regardless of whether there are used segments belonging to those data
+sources in the cluster or not.
* `/druid/coordinator/v1/metadata/datasources?full`
-Returns a list of all enabled datasources with all metadata about those datasources as stored in the metadata store.
+Returns a list of all data sources with at least one used segment in the cluster. Returns all metadata about those data
+sources as stored in the metadata store.
* `/druid/coordinator/v1/metadata/datasources/{dataSourceName}`
@@ -229,11 +231,15 @@ Caution : Avoid using indexing or kill tasks and these API's at the same time fo
* `/druid/coordinator/v1/datasources/{dataSourceName}`
-Enables all segments of datasource which are not overshadowed by others.
+Marks as used all segments belonging to a data source. Returns a JSON object of the form
+`{"numChangedSegments": }` with the number of segments in the database whose state has been changed (that is,
+the segments were marked as used) as the result of this API call.
* `/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}`
-Enables a segment of a datasource.
+Marks as used a segment of a data source. Returns a JSON object of the form `{"segmentStateChanged": }` with
+the boolean indicating if the state of the segment has been changed (that is, the segment was marked as used) as the
+result of this API call.
* `/druid/coordinator/v1/datasources/{dataSourceName}/markUsed`
@@ -259,7 +265,9 @@ JSON Request Payload:
* `/druid/coordinator/v1/datasources/{dataSourceName}`
-Disables a datasource.
+Marks as unused all segments belonging to a data source. Returns a JSON object of the form
+`{"numChangedSegments": }` with the number of segments in the database whose state has been changed (that is,
+the segments were marked as unused) as the result of this API call.
* `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}`
* `@Deprecated. /druid/coordinator/v1/datasources/{dataSourceName}?kill=true&interval={myInterval}`
@@ -268,7 +276,9 @@ Runs a [Kill task](../ingestion/tasks.html) for a given interval and datasource.
* `/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}`
-Disables a segment.
+Marks as unused a segment of a data source. Returns a JSON object of the form `{"segmentStateChanged": }` with
+the boolean indicating if the state of the segment has been changed (that is, the segment was marked as unused) as the
+result of this API call.
#### Retention Rules
@@ -595,7 +605,7 @@ Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_`
* `/druid/indexer/v1/worker`
-Retreives current overlord dynamic configuration.
+Retrieves current overlord dynamic configuration.
* `/druid/indexer/v1/worker/history?interval={interval}&counter={count}`
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 76883ea3f891..1ed80ca83c9a 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -390,7 +390,7 @@ Pair, Map>> checkSegment
// drop derivative segments which interval equals the interval in toDeleteBaseSegments
for (Interval interval : toDropInterval.keySet()) {
for (DataSegment segment : derivativeSegments.get(interval)) {
- segmentManager.removeSegment(segment.getId().toString());
+ segmentManager.markSegmentAsUnused(segment.getId().toString());
}
}
// data of the latest interval will be built firstly.
@@ -498,7 +498,7 @@ private void clearSegments()
{
log.info("Clear all metadata of dataSource %s", dataSource);
metadataStorageCoordinator.deletePendingSegments(dataSource, ALL_INTERVAL);
- segmentManager.removeDataSource(dataSource);
+ segmentManager.markAsUnusedAllSegmentsInDataSource(dataSource);
metadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index e766effb4125..07a507af140d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -91,6 +91,7 @@
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CircularBuffer;
+import org.apache.druid.utils.CollectionUtils;
import org.codehaus.plexus.util.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -444,8 +445,7 @@ public TaskStatus run(final TaskToolbox toolbox)
toolbox.getTaskActionClient(),
intervals
);
- versions = locks.entrySet().stream()
- .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getVersion()));
+ versions = CollectionUtils.mapValues(locks, TaskLock::getVersion);
dataSchema = ingestionSchema.getDataSchema().withGranularitySpec(
ingestionSchema.getDataSchema()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillTask.java
index 44c90bc8a47b..ac15e67e74b1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillTask.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import org.apache.druid.client.indexing.ClientKillQuery;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -37,6 +38,9 @@
import java.util.Map;
/**
+ * The client representation of this task is {@link ClientKillQuery}.
+ * JSON serialization fields of this class must correspond to those of {@link
+ * ClientKillQuery}, except for "id" and "context" fields.
*/
public class KillTask extends AbstractFixedIntervalTask
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java
index fa30dde802b8..c5f677121edf 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java
@@ -73,7 +73,9 @@ public void setup() throws IOException
expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));
- expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId().toString()));
+ expectedUnusedSegments.forEach(
+ s -> actionTestKit.getMetadataSegmentManager().markSegmentAsUnused(s.getId().toString())
+ );
}
private DataSegment createSegment(Interval interval, String version)
diff --git a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java
index 84176621e4b7..865b8bd0363a 100644
--- a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java
+++ b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java
@@ -19,11 +19,13 @@
package org.apache.druid.client;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Ordering;
+import com.google.common.collect.Maps;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -31,40 +33,88 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/**
- * An immutable snapshot of fields from {@link org.apache.druid.metadata.SQLMetadataSegmentManager} (dataSources and
- * overshadowedSegments). Getters of {@link org.apache.druid.metadata.MetadataSegmentManager} should use this snapshot
- * to return dataSources and overshadowedSegments.
+ * An immutable snapshot information about used segments and overshadowed segments for
+ * {@link org.apache.druid.metadata.SQLMetadataSegmentManager}.
*/
public class DataSourcesSnapshot
{
- private final Map dataSources;
+ public static DataSourcesSnapshot fromUsedSegments(
+ Iterable segments,
+ ImmutableMap dataSourceProperties
+ )
+ {
+ Map dataSources = new HashMap<>();
+ segments.forEach(segment -> {
+ dataSources
+ .computeIfAbsent(segment.getDataSource(), dsName -> new DruidDataSource(dsName, dataSourceProperties))
+ .addSegmentIfAbsent(segment);
+ });
+ return new DataSourcesSnapshot(CollectionUtils.mapValues(dataSources, DruidDataSource::toImmutableDruidDataSource));
+ }
+
+ public static DataSourcesSnapshot fromUsedSegmentsTimelines(
+ Map> usedSegmentsTimelinesPerDataSource,
+ ImmutableMap dataSourceProperties
+ )
+ {
+ Map dataSourcesWithAllUsedSegments =
+ Maps.newHashMapWithExpectedSize(usedSegmentsTimelinesPerDataSource.size());
+ usedSegmentsTimelinesPerDataSource.forEach(
+ (dataSourceName, usedSegmentsTimeline) -> {
+ DruidDataSource dataSource = new DruidDataSource(dataSourceName, dataSourceProperties);
+ usedSegmentsTimeline.iterateAllObjects().forEach(dataSource::addSegment);
+ dataSourcesWithAllUsedSegments.put(dataSourceName, dataSource.toImmutableDruidDataSource());
+ }
+ );
+ return new DataSourcesSnapshot(dataSourcesWithAllUsedSegments, usedSegmentsTimelinesPerDataSource);
+ }
+
+ private final Map dataSourcesWithAllUsedSegments;
+ private final Map> usedSegmentsTimelinesPerDataSource;
private final ImmutableSet overshadowedSegments;
- public DataSourcesSnapshot(
- Map dataSources
+ public DataSourcesSnapshot(Map dataSourcesWithAllUsedSegments)
+ {
+ this(
+ dataSourcesWithAllUsedSegments,
+ CollectionUtils.mapValues(
+ dataSourcesWithAllUsedSegments,
+ dataSource -> VersionedIntervalTimeline.forSegments(dataSource.getSegments())
+ )
+ );
+ }
+
+ private DataSourcesSnapshot(
+ Map dataSourcesWithAllUsedSegments,
+ Map> usedSegmentsTimelinesPerDataSource
)
{
- this.dataSources = dataSources;
+ this.dataSourcesWithAllUsedSegments = dataSourcesWithAllUsedSegments;
+ this.usedSegmentsTimelinesPerDataSource = usedSegmentsTimelinesPerDataSource;
this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments());
}
- public Collection getDataSources()
+ public Collection getDataSourcesWithAllUsedSegments()
{
- return dataSources.values();
+ return dataSourcesWithAllUsedSegments.values();
}
public Map getDataSourcesMap()
{
- return dataSources;
+ return dataSourcesWithAllUsedSegments;
}
@Nullable
public ImmutableDruidDataSource getDataSource(String dataSourceName)
{
- return dataSources.get(dataSourceName);
+ return dataSourcesWithAllUsedSegments.get(dataSourceName);
+ }
+
+ public Map> getUsedSegmentsTimelinesPerDataSource()
+ {
+ return usedSegmentsTimelinesPerDataSource;
}
public ImmutableSet getOvershadowedSegments()
@@ -72,40 +122,48 @@ public ImmutableSet getOvershadowedSegments()
return overshadowedSegments;
}
- @Nullable
- public Iterable iterateAllSegmentsInSnapshot()
+ /**
+ * Returns an iterable to go over all used segments in all data sources. The order in which segments are iterated
+ * is unspecified.
+ *
+ * Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try (to some
+ * reasonable extent) to organize the code so that it iterates the returned iterable only once rather than several
+ * times.
+ *
+ * This method's name starts with "iterate" because the result is expected to be consumed immediately in a for-each
+ * statement or a stream pipeline, like
+ * for (DataSegment segment : snapshot.iterateAllUsedSegmentsInSnapshot()) {...}
+ */
+ public Iterable iterateAllUsedSegmentsInSnapshot()
{
- if (dataSources == null) {
- return null;
- }
- return () -> dataSources.values().stream()
- .flatMap(dataSource -> dataSource.getSegments().stream())
- .iterator();
+ return () -> dataSourcesWithAllUsedSegments
+ .values()
+ .stream()
+ .flatMap(dataSource -> dataSource.getSegments().stream())
+ .iterator();
}
/**
- * This method builds timelines from all dataSources and finds the overshadowed segments list
+ * This method builds timelines from all data sources and finds the overshadowed segments list
+ *
+ * This method should be deduplicated with {@link VersionedIntervalTimeline#findOvershadowed()}: see
+ * https://github.com/apache/incubator-druid/issues/8070.
*
* @return overshadowed segment Ids list
*/
private List determineOvershadowedSegments()
{
- final List segments = dataSources.values().stream()
- .flatMap(ds -> ds.getSegments().stream())
- .collect(Collectors.toList());
- final Map> timelines = new HashMap<>();
- segments.forEach(segment -> timelines
- .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
- .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));
-
// It's fine to add all overshadowed segments to a single collection because only
// a small fraction of the segments in the cluster are expected to be overshadowed,
// so building this collection shouldn't generate a lot of garbage.
final List overshadowedSegments = new ArrayList<>();
- for (DataSegment dataSegment : segments) {
- final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource());
- if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
- overshadowedSegments.add(dataSegment.getId());
+ for (ImmutableDruidDataSource dataSource : dataSourcesWithAllUsedSegments.values()) {
+ VersionedIntervalTimeline usedSegmentsTimeline =
+ usedSegmentsTimelinesPerDataSource.get(dataSource.getName());
+ for (DataSegment segment : dataSource.getSegments()) {
+ if (usedSegmentsTimeline.isOvershadowed(segment.getInterval(), segment.getVersion())) {
+ overshadowedSegments.add(segment.getId());
+ }
}
}
return overshadowedSegments;
diff --git a/server/src/main/java/org/apache/druid/client/DruidDataSource.java b/server/src/main/java/org/apache/druid/client/DruidDataSource.java
index 7678dad68377..c293523b0797 100644
--- a/server/src/main/java/org/apache/druid/client/DruidDataSource.java
+++ b/server/src/main/java/org/apache/druid/client/DruidDataSource.java
@@ -30,12 +30,13 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
/**
* A mutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source.
*
- * Concurrency: could be updated concurrently via {@link #addSegment} and {@link #removeSegment}, and accessed
- * concurrently (e. g. via {@link #getSegments}) as well.
+ * Concurrency: could be updated concurrently via {@link #addSegment}, {@link #removeSegment}, and {@link
+ * #removeSegmentsIf}, and accessed concurrently (e. g. via {@link #getSegments}) as well.
*
* @see ImmutableDruidDataSource - an immutable counterpart of this class
*/
@@ -44,10 +45,7 @@ public class DruidDataSource
private final String name;
private final Map properties;
/**
- * This map needs to be concurrent because it should be possible to iterate the segments of the data source
- * (indirectly via {@link #getSegments} or in {@link #toString}) concurrently updates via {@link #addSegment} or
- * {@link #removeSegment}. Concurrent updates are also supported incidentally, though this is not needed for the use
- * cases of DruidDataSource.
+ * This map needs to be concurrent to support concurrent iteration and updates.
*/
private final ConcurrentMap idToSegmentMap = new ConcurrentHashMap<>();
@@ -80,6 +78,14 @@ public Collection getSegments()
return Collections.unmodifiableCollection(idToSegmentMap.values());
}
+ /**
+ * Removes segments for which the given filter returns true.
+ */
+ public void removeSegmentsIf(Predicate filter)
+ {
+ idToSegmentMap.values().removeIf(filter);
+ }
+
public DruidDataSource addSegment(DataSegment dataSegment)
{
idToSegmentMap.put(dataSegment.getId(), dataSegment);
@@ -99,7 +105,7 @@ public boolean addSegmentIfAbsent(DataSegment dataSegment)
* Returns the removed segment, or null if there was no segment with the given {@link SegmentId} in this
* DruidDataSource.
*/
- public DataSegment removeSegment(SegmentId segmentId)
+ public @Nullable DataSegment removeSegment(SegmentId segmentId)
{
return idToSegmentMap.remove(segmentId);
}
@@ -126,7 +132,6 @@ public String toString()
@Override
public boolean equals(Object o)
{
- //noinspection Contract
throw new UnsupportedOperationException("Use ImmutableDruidDataSource instead");
}
diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java
index ac73d63febf3..d3bcb0dedcee 100644
--- a/server/src/main/java/org/apache/druid/client/DruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/DruidServer.java
@@ -232,7 +232,6 @@ public DataSegment removeDataSegment(SegmentId segmentId)
segmentId
);
// Returning null from the lambda here makes the ConcurrentHashMap to not record any entry.
- //noinspection ReturnOfNull
return null;
}
DataSegment segment = dataSource.removeSegment(segmentId);
@@ -244,7 +243,6 @@ public DataSegment removeDataSegment(SegmentId segmentId)
log.warn("Asked to remove data segment that doesn't exist!? server[%s], segment[%s]", getName(), segmentId);
}
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
- //noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
);
diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
index 59539443b618..7ce3fd6dffb9 100644
--- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
+++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
@@ -123,6 +123,8 @@ public String toString()
@Override
public boolean equals(Object o)
{
+ // Note: this method is not well-defined. It should instead just throw UnsupportedOperationsException.
+ // See https://github.com/apache/incubator-druid/issues/7858.
if (this == o) {
return true;
}
@@ -146,6 +148,8 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
- return Objects.hash(name, properties, idToSegments);
+ // Note: this method is not well-defined. It should instead just throw UnsupportedOperationsException.
+ // See https://github.com/apache/incubator-druid/issues/7858.
+ return Objects.hash(name, properties);
}
}
diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
index d01ad961a70c..605b2de877ff 100644
--- a/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
@@ -42,19 +42,19 @@ public class ImmutableDruidServer
private final DruidServerMetadata metadata;
private final long currSize;
private final ImmutableMap dataSources;
- private final int totalSegments;
+ private final int numSegments;
public ImmutableDruidServer(
DruidServerMetadata metadata,
long currSize,
ImmutableMap dataSources,
- int totalSegments
+ int numSegments
)
{
this.metadata = Preconditions.checkNotNull(metadata);
this.currSize = currSize;
this.dataSources = dataSources;
- this.totalSegments = totalSegments;
+ this.numSegments = numSegments;
}
public String getName()
@@ -128,23 +128,26 @@ public ImmutableDruidDataSource getDataSource(String name)
}
/**
- * Returns a lazy collection with all segments in all data sources, stored on this ImmutableDruidServer. The order
- * of segments in this collection is unspecified.
- *
- * Calling {@link Collection#size()} on the returned collection is cheap, O(1).
+ * Returns a lazy collection with all segments in all data sources stored on this ImmutableDruidServer to be used for
+ * iteration or {@link Collection#stream()} transformation. The order of segments in this collection is unspecified.
*
* Note: iteration over the returned collection may not be as trivially cheap as, for example, iteration over an
* ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned collection only
* once rather than several times.
*/
- public Collection getLazyAllSegments()
+ public Collection iterateAllSegments()
{
return CollectionUtils.createLazyCollectionFromStream(
() -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()),
- totalSegments
+ numSegments
);
}
+ public int getNumSegments()
+ {
+ return numSegments;
+ }
+
public String getURL()
{
if (metadata.getHostAndTlsPort() != null) {
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
index 695297dd2142..1a09c451e14d 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
@@ -29,6 +29,10 @@
import java.util.Map;
import java.util.Objects;
+/**
+ * Client representation of org.apache.druid.indexing.common.task.CompactionTask. JSON serialization fields of
+ * this class must correspond to those of org.apache.druid.indexing.common.task.CompactionTask.
+ */
public class ClientCompactQuery implements ClientQuery
{
private final String dataSource;
@@ -136,7 +140,7 @@ public int hashCode()
@Override
public String toString()
{
- return "ClientCompactQuery{" +
+ return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", segments=" + segments +
", interval=" + interval +
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
index 068016952b4d..14cf45b55a8e 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
@@ -43,17 +43,17 @@ public class ClientCompactQueryTuningConfig
private final Long pushTimeout;
public static ClientCompactQueryTuningConfig from(
- @Nullable UserCompactTuningConfig userCompactTuningConfig,
+ @Nullable UserCompactTuningConfig userCompactionTaskQueryTuningConfig,
@Nullable Integer maxRowsPerSegment
)
{
return new ClientCompactQueryTuningConfig(
maxRowsPerSegment,
- userCompactTuningConfig == null ? null : userCompactTuningConfig.getMaxRowsInMemory(),
- userCompactTuningConfig == null ? null : userCompactTuningConfig.getMaxTotalRows(),
- userCompactTuningConfig == null ? null : userCompactTuningConfig.getIndexSpec(),
- userCompactTuningConfig == null ? null : userCompactTuningConfig.getMaxPendingPersists(),
- userCompactTuningConfig == null ? null : userCompactTuningConfig.getPushTimeout()
+ userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(),
+ userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxTotalRows(),
+ userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getIndexSpec(),
+ userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxPendingPersists(),
+ userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getPushTimeout()
);
}
@@ -150,7 +150,7 @@ public int hashCode()
@Override
public String toString()
{
- return "ClientCompactQueryTuningConfig{" +
+ return getClass().getSimpleName() + "{" +
"maxRowsPerSegment=" + maxRowsPerSegment +
", maxRowsInMemory=" + maxRowsInMemory +
", maxTotalRows=" + maxTotalRows +
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
index 06d88f9535a3..583dee506cda 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
@@ -24,6 +24,9 @@
import org.joda.time.Interval;
/**
+ * Client representation of org.apache.druid.indexing.common.task.KillTask. JSON searialization
+ * fields of this class must correspond to those of
+ * org.apache.druid.indexing.common.task.KillTask, except for "id" and "context" fields.
*/
public class ClientKillQuery implements ClientQuery
{
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
index aaa8b5c3ce3a..306d6e7c0cb2 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
@@ -24,7 +24,12 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
- * org.apache.druid.indexing.common.task.Task representation for clients
+ * org.apache.druid.indexing.common.task.Task representations for clients. The magic conversion happens right
+ * at the moment of making a REST query: {@link HttpIndexingServiceClient#runTask} serializes ClientTaskQuery
+ * objects and org.apache.druid.indexing.overlord.http.OverlordResource.taskPost() deserializes
+ * org.apache.druid.indexing.common.task.Task objects from the same bytes. Therefore JSON serialization fields of
+ * ClientTaskQuery objects must match with those of the corresponding
+ * org.apache.druid.indexing.common.task.Task objects.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
index db1fbef2499a..550dd49b0bf9 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
@@ -24,6 +24,7 @@
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -32,89 +33,105 @@
import java.util.Set;
/**
+ * The difference between this class and org.apache.druid.sql.calcite.schema.MetadataSegmentView is that this
+ * class resides in Coordinator's memory, while org.apache.druid.sql.calcite.schema.MetadataSegmentView resides
+ * in Broker's memory.
*/
public interface MetadataSegmentManager
{
- void start();
+ void startPollingDatabasePeriodically();
- void stop();
+ void stopPollingDatabasePeriodically();
+
+ boolean isPollingDatabasePeriodically();
/**
- * Enables all segments for a dataSource which will not be overshadowed.
+ * Returns the number of segment entries in the database whose state was changed as the result of this call (that is,
+ * the segments were marked as used). If the call results in a database error, an exception is relayed to the caller.
*/
- boolean enableDataSource(String dataSource);
+ int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource);
+
+ int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval);
- boolean enableSegment(String segmentId);
+ int markAsUsedNonOvershadowedSegments(String dataSource, Set segmentIds)
+ throws UnknownSegmentIdException;
/**
- * Enables all segments contained in the interval which are not overshadowed by any currently enabled segments.
+ * Returns true if the state of the segment entry is changed in the database as the result of this call (that is, the
+ * segment was marked as used), false otherwise. If the call results in a database error, an exception is relayed to
+ * the caller.
*/
- int enableSegments(String dataSource, Interval interval);
+ boolean markSegmentAsUsed(String segmentId);
/**
- * Enables the segments passed which are not overshadowed by any currently enabled segments.
+ * Returns the number of segment entries in the database whose state was changed as the result of this call (that is,
+ * the segments were marked as unused). If the call results in a database error, an exception is relayed to the
+ * caller.
*/
- int enableSegments(String dataSource, Collection segmentIds);
+ int markAsUnusedAllSegmentsInDataSource(String dataSource);
+
+ int markAsUnusedSegmentsInInterval(String dataSource, Interval interval);
- boolean removeDataSource(String dataSource);
+ int markSegmentsAsUnused(String dataSource, Set segmentIds);
/**
- * Removes the given segmentId from metadata store. Returns true if one or more rows were affected.
+ * Returns true if the state of the segment entry is changed in the database as the result of this call (that is, the
+ * segment was marked as unused), false otherwise. If the call results in a database error, an exception is relayed to
+ * the caller.
*/
- boolean removeSegment(String segmentId);
+ boolean markSegmentAsUnused(String segmentId);
- long disableSegments(String dataSource, Collection segmentIds);
-
- int disableSegments(String dataSource, Interval interval);
+ /**
+ * If there are used segments belonging to the given data source this method returns them as an {@link
+ * ImmutableDruidDataSource} object. If there are no used segments belonging to the given data source this method
+ * returns null.
+ */
+ @Nullable ImmutableDruidDataSource getImmutableDataSourceWithUsedSegments(String dataSource);
- boolean isStarted();
+ /**
+ * Returns a set of {@link ImmutableDruidDataSource} objects containing information about all used segments. {@link
+ * ImmutableDruidDataSource} objects in the returned collection are unique. If there are no used segments, this method
+ * returns an empty collection.
+ */
+ Collection getImmutableDataSourcesWithAllUsedSegments();
- @Nullable
- ImmutableDruidDataSource getDataSource(String dataSourceName);
+ /**
+ * Returns a set of overshadowed segment ids.
+ */
+ Set getOvershadowedSegments();
/**
- * Returns a collection of known datasources.
- *
- * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
- * not yet been polled.)
+ * Returns a snapshot of DruidDataSources and overshadowed segments
*/
- @Nullable
- Collection getDataSources();
+ DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments();
/**
* Returns an iterable to go over all segments in all data sources. The order in which segments are iterated is
* unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
* (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
* several times.
- *
- * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
- * not yet been polled.)
*/
- @Nullable
- Iterable iterateAllSegments();
-
- Collection getAllDataSourceNames();
+ Iterable iterateAllUsedSegments();
/**
- * Returns a set of overshadowed segment Ids
+ * Retrieves all data source names for which there are segment in the database, regardless of whether those segments
+ * are used or not. Data source names in the returned collection are unique. If there are no segments in the database,
+ * returns an empty collection.
*
- * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
- * not yet been polled.)
- */
- @Nullable
- Set getOvershadowedSegments();
-
- /**
- * Returns a snapshot of DruidDataSources and overshadowed segments
+ * Performance warning: this method makes a query into the database.
*
+ * This method might return a different set of data source names than may be observed via {@link
+ * #getImmutableDataSourcesWithAllUsedSegments} method. This method will include a data source name even if there
+ * are no used segments belonging to it, while {@link #getImmutableDataSourcesWithAllUsedSegments} won't return
+ * such a data source.
*/
- @Nullable
- DataSourcesSnapshot getDataSourcesSnapshot();
+ Collection retrieveAllDataSourceNames();
/**
- * Returns top N unused segment intervals in given interval when ordered by segment start time, end time.
+ * Returns top N unused segment intervals with the end time no later than the specified maxEndTime when ordered by
+ * segment start time, end time.
*/
- List getUnusedSegmentIntervals(String dataSource, Interval interval, int limit);
+ List getUnusedSegmentIntervals(String dataSource, DateTime maxEndTime, int limit);
@VisibleForTesting
void poll();
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManagerConfig.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManagerConfig.java
index 3f58cfa70982..d6c881198a89 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManagerConfig.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManagerConfig.java
@@ -33,4 +33,9 @@ public Period getPollDuration()
{
return pollDuration;
}
+
+ public void setPollDuration(Period pollDuration)
+ {
+ this.pollDuration = pollDuration;
+ }
}
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
index be95669feb25..1f47e44811c7 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
@@ -200,7 +200,7 @@ public void run()
{
try {
// poll() is synchronized together with start() and stop() to ensure that when stop() exits, poll()
- // won't actually run anymore after that (it could only enter the syncrhonized section and exit
+ // won't actually run anymore after that (it could only enter the synchronized section and exit
// immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed
// to avoid flakiness in SQLMetadataRuleManagerTest.
// See https://github.com/apache/incubator-druid/issues/6028
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
index bb8514291df1..db1d19e49ae9 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
@@ -20,16 +20,20 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.client.DataSourcesSnapshot;
-import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.Pair;
@@ -41,18 +45,18 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
-import org.apache.druid.utils.CollectionUtils;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.skife.jdbi.v2.BaseResultSetMapper;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.FoldController;
-import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import javax.annotation.Nullable;
@@ -60,19 +64,19 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
/**
*
@@ -83,11 +87,48 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class);
/**
- * Use to synchronize {@link #start()}, {@link #stop()}, {@link #poll()}, and {@link #isStarted()}. These methods
- * should be synchronized to prevent from being called at the same time if two different threads are calling them.
- * This might be possible if a druid coordinator gets and drops leadership repeatedly in quick succession.
+ * Marker interface for objects stored in {@link #latestDatabasePoll}. See the comment for that field for details.
*/
- private final ReentrantReadWriteLock startStopLock = new ReentrantReadWriteLock();
+ private interface DatabasePoll
+ {}
+
+ /** Represents periodic {@link #poll}s happening from {@link #exec}. */
+ private static class PeriodicDatabasePoll implements DatabasePoll
+ {
+ /**
+ * This future allows to wait until {@link #dataSourcesSnapshot} is initialized in the first {@link #poll()}
+ * happening since {@link #startPollingDatabasePeriodically()} is called for the first time, or since the last
+ * visible (in happens-before terms) call to {@link #startPollingDatabasePeriodically()} in case of Coordinator's
+ * leadership changes.
+ */
+ final CompletableFuture firstPollCompletionFuture = new CompletableFuture<>();
+ }
+
+ /**
+ * Represents on-demand {@link #poll} initiated at periods of time when SqlSegmentsMetadata doesn't poll the database
+ * periodically.
+ */
+ private static class OnDemandDatabasePoll implements DatabasePoll
+ {
+ final long initiationTimeNanos = System.nanoTime();
+ final CompletableFuture pollCompletionFuture = new CompletableFuture<>();
+
+ long nanosElapsedFromInitiation()
+ {
+ return System.nanoTime() - initiationTimeNanos;
+ }
+ }
+
+ /**
+ * Use to synchronize {@link #startPollingDatabasePeriodically}, {@link #stopPollingDatabasePeriodically}, {@link
+ * #poll}, and {@link #isPollingDatabasePeriodically}. These methods should be synchronized to prevent from being
+ * called at the same time if two different threads are calling them. This might be possible if Coordinator gets and
+ * drops leadership repeatedly in quick succession.
+ *
+ * This lock is also used to synchronize {@link #awaitOrPerformDatabasePoll} for times when SqlSegmentsMetadata
+ * is not polling the database periodically (in other words, when the Coordinator is not the leader).
+ */
+ private final ReentrantReadWriteLock startStopPollLock = new ReentrantReadWriteLock();
/**
* Used to ensure that {@link #poll()} is never run concurrently. It should already be so (at least in production
@@ -95,41 +136,91 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
* scheduled in a single-threaded {@link #exec}, so this lock is an additional safety net in case there are bugs in
* the code, and for tests, where {@link #poll()} is called from the outside code.
*
- * Not using {@link #startStopLock}.writeLock() in order to still be able to run {@link #poll()} concurrently with
- * {@link #isStarted()}.
+ * Not using {@link #startStopPollLock}.writeLock() in order to still be able to run {@link #poll()} concurrently
+ * with {@link #isPollingDatabasePeriodically()}.
*/
private final Object pollLock = new Object();
private final ObjectMapper jsonMapper;
- private final Supplier config;
+ private final Duration periodicPollDelay;
private final Supplier dbTables;
private final SQLMetadataConnector connector;
- // Volatile since this reference is reassigned in "poll" and then read from in other threads.
- // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty dataSources map and
- // empty overshadowedSegments set).
- // Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between
- // null and nonnull multiple times as stop() and start() are called.
- @Nullable
- private volatile DataSourcesSnapshot dataSourcesSnapshot = null;
+ /**
+ * This field is made volatile to avoid "ghost secondary reads" that may result in NPE, see
+ * https://github.com/code-review-checklists/java-concurrency#safe-local-dcl (note that dataSourcesSnapshot resembles
+ * a lazily initialized field). Alternative is to always read the field in a snapshot local variable, but it's too
+ * easy to forget to do.
+ *
+ * This field may be updated from {@link #exec}, or from whatever thread calling {@link #doOnDemandPoll} via {@link
+ * #awaitOrPerformDatabasePoll()} via one of the public methods of SqlSegmentsMetadata.
+ */
+ private volatile @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = null;
/**
- * The number of times this SQLMetadataSegmentManager was started.
+ * The latest {@link DatabasePoll} represent {@link #poll()} calls which update {@link #dataSourcesSnapshot}, either
+ * periodically (see {@link PeriodicDatabasePoll}, {@link #startPollingDatabasePeriodically}, {@link
+ * #stopPollingDatabasePeriodically}) or "on demand" (see {@link OnDemandDatabasePoll}), when one of the methods that
+ * accesses {@link #dataSourcesSnapshot}'s state (such as {@link #getImmutableDataSourceWithUsedSegments}) is
+ * called when the Coordinator is not the leader and therefore SqlSegmentsMetadata isn't polling the database
+ * periodically.
+ *
+ * Note that if there is a happens-before relationship between a call to {@link #startPollingDatabasePeriodically()}
+ * (on Coordinators' leadership change) and one of the methods accessing the {@link #dataSourcesSnapshot}'s state in
+ * this class the latter is guaranteed to await for the initiated periodic poll. This is because when the latter
+ * method calls to {@link #awaitLatestDatabasePoll()} via {@link #awaitOrPerformDatabasePoll}, they will
+ * see the latest {@link PeriodicDatabasePoll} value (stored in this field, latestDatabasePoll, in {@link
+ * #startPollingDatabasePeriodically()}) and to await on its {@link PeriodicDatabasePoll#firstPollCompletionFuture}.
+ *
+ * However, the guarantee explained above doesn't make any actual semantic difference, because on both periodic and
+ * on-demand database polls the same invariant is maintained that the results not older than {@link
+ * #periodicPollDelay} are used. The main difference is in performance: since on-demand polls are irregular and happen
+ * in the context of the thread wanting to access the {@link #dataSourcesSnapshot}, that may cause delays in the
+ * logic. On the other hand, periodic polls are decoupled into {@link #exec} and {@link
+ * #dataSourcesSnapshot}-accessing methods should be generally "wait free" for database polls.
+ *
+ * The notion and the complexity of "on demand" database polls was introduced to simplify the interface of {@link
+ * MetadataSegmentManager} and guarantee that it always returns consistent and relatively up-to-date data from methods
+ * like {@link #getImmutableDataSourceWithUsedSegments}, while avoiding excessive repetitive polls. The last part
+ * is achieved via "hooking on" other polls by awaiting on {@link PeriodicDatabasePoll#firstPollCompletionFuture} or
+ * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #awaitOrPerformDatabasePoll} method
+ * implementation for details.
+ *
+ * Note: the overall implementation of periodic/on-demand polls is not completely optimal: for example, when the
+ * Coordinator just stopped leading, the latest periodic {@link #poll} (which is still "fresh") is not considered
+ * and a new on-demand poll is always initiated. This is done to simplify the implementation, while the efficiency
+ * during Coordinator leadership switches is not a priority.
+ *
+ * This field is {@code volatile} because it's checked and updated in a double-checked locking manner in {@link
+ * #awaitOrPerformDatabasePoll()}.
*/
- private long startCount = 0;
+ private volatile @Nullable DatabasePoll latestDatabasePoll = null;
+
+ /** Used to cancel periodic poll task in {@link #stopPollingDatabasePeriodically}. */
+ @GuardedBy("startStopPollLock")
+ private @Nullable Future> periodicPollTaskFuture = null;
+
+ /** The number of times {@link #startPollingDatabasePeriodically} was called. */
+ @GuardedBy("startStopPollLock")
+ private long startPollingCount = 0;
+
/**
- * Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if
+ * Equal to the current {@link #startPollingCount} value if the SqlSegmentsMetadata is currently started; -1 if
* currently stopped.
*
* This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent
- * the theoretical situation of two or more tasks scheduled in {@link #start()} calling {@link #isStarted()} and
- * {@link #poll()} concurrently, if the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions
- * occurs quickly.
+ * the theoretical situation of two or more tasks scheduled in {@link #startPollingDatabasePeriodically()} calling
+ * {@link #isPollingDatabasePeriodically()} and {@link #poll()} concurrently, if the sequence of {@link
+ * #startPollingDatabasePeriodically()} - {@link #stopPollingDatabasePeriodically()} - {@link
+ * #startPollingDatabasePeriodically()} actions occurs quickly.
*
- * {@link SQLMetadataRuleManager} also have a similar issue.
+ * {@link SQLMetadataRuleManager} also has a similar issue.
*/
- private long currentStartOrder = -1;
- private ScheduledExecutorService exec = null;
+ @GuardedBy("startStopPollLock")
+ private long currentStartPollingOrder = -1;
+
+ @GuardedBy("startStopPollLock")
+ private @Nullable ScheduledExecutorService exec = null;
@Inject
public SQLMetadataSegmentManager(
@@ -140,33 +231,73 @@ public SQLMetadataSegmentManager(
)
{
this.jsonMapper = jsonMapper;
- this.config = config;
+ this.periodicPollDelay = config.get().getPollDuration().toStandardDuration();
this.dbTables = dbTables;
this.connector = connector;
}
- @Override
+ /**
+ * Don't confuse this method with {@link #startPollingDatabasePeriodically}. This is a lifecycle starting method to
+ * be executed just once for an instance of SqlSegmentsMetadata.
+ */
@LifecycleStart
public void start()
{
- ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+ lock.lock();
+ try {
+ if (exec != null) {
+ return; // Already started
+ }
+ exec = Execs.scheduledSingleThreaded(getClass().getName() + "-Exec--%d");
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Don't confuse this method with {@link #stopPollingDatabasePeriodically}. This is a lifecycle stopping method to
+ * be executed just once for an instance of SqlSegmentsMetadata.
+ */
+ @LifecycleStop
+ public void stop()
+ {
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+ lock.lock();
+ try {
+ exec.shutdownNow();
+ exec = null;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void startPollingDatabasePeriodically()
+ {
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
lock.lock();
try {
- if (isStarted()) {
+ if (exec == null) {
+ throw new IllegalStateException(getClass().getName() + " is not started");
+ }
+ if (isPollingDatabasePeriodically()) {
return;
}
- startCount++;
- currentStartOrder = startCount;
- final long localStartOrder = currentStartOrder;
+ PeriodicDatabasePoll periodicPollUpdate = new PeriodicDatabasePoll();
+ latestDatabasePoll = periodicPollUpdate;
- exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
+ startPollingCount++;
+ currentStartPollingOrder = startPollingCount;
+ final long localStartOrder = currentStartPollingOrder;
- final Duration delay = config.get().getPollDuration().toStandardDuration();
- exec.scheduleWithFixedDelay(
- createPollTaskForStartOrder(localStartOrder),
+ periodicPollTaskFuture = exec.scheduleWithFixedDelay(
+ createPollTaskForStartOrder(localStartOrder, periodicPollUpdate),
0,
- delay.getMillis(),
+ periodicPollDelay.getMillis(),
TimeUnit.MILLISECONDS
);
}
@@ -175,24 +306,33 @@ public void start()
}
}
- private Runnable createPollTaskForStartOrder(long startOrder)
+ private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePoll periodicPollUpdate)
{
return () -> {
- // poll() is synchronized together with start(), stop() and isStarted() to ensure that when stop() exits, poll()
- // won't actually run anymore after that (it could only enter the syncrhonized section and exit immediately
- // because the localStartedOrder doesn't match the new currentStartOrder). It's needed to avoid flakiness in
- // SQLMetadataSegmentManagerTest. See https://github.com/apache/incubator-druid/issues/6028
- ReentrantReadWriteLock.ReadLock lock = startStopLock.readLock();
+ // poll() is synchronized together with startPollingDatabasePeriodically(), stopPollingDatabasePeriodically() and
+ // isPollingDatabasePeriodically() to ensure that when stopPollingDatabasePeriodically() exits, poll() won't
+ // actually run anymore after that (it could only enter the synchronized section and exit immediately because the
+ // localStartedOrder doesn't match the new currentStartPollingOrder). It's needed to avoid flakiness in
+ // SqlSegmentsMetadataTest. See https://github.com/apache/incubator-druid/issues/6028
+ ReentrantReadWriteLock.ReadLock lock = startStopPollLock.readLock();
lock.lock();
try {
- if (startOrder == currentStartOrder) {
+ if (startOrder == currentStartPollingOrder) {
poll();
+ periodicPollUpdate.firstPollCompletionFuture.complete(null);
} else {
- log.debug("startOrder = currentStartOrder = %d, skipping poll()", startOrder);
+ log.debug("startOrder = currentStartPollingOrder = %d, skipping poll()", startOrder);
}
}
- catch (Exception e) {
- log.makeAlert(e, "uncaught exception in segment manager polling thread").emit();
+ catch (Throwable t) {
+ log.makeAlert(t, "Uncaught exception in %s's polling thread", SQLMetadataSegmentManager.class).emit();
+ // Swallow the exception, so that scheduled polling goes on. Leave firstPollFutureSinceLastStart uncompleted
+ // for now, so that it may be completed during the next poll.
+ if (!(t instanceof Exception)) {
+ // Don't try to swallow a Throwable which is not an Exception (that is, a Error).
+ periodicPollUpdate.firstPollCompletionFuture.completeExceptionally(t);
+ throw t;
+ }
}
finally {
lock.unlock();
@@ -201,320 +341,435 @@ private Runnable createPollTaskForStartOrder(long startOrder)
}
@Override
- @LifecycleStop
- public void stop()
+ public boolean isPollingDatabasePeriodically()
+ {
+ // isPollingDatabasePeriodically() is synchronized together with startPollingDatabasePeriodically(),
+ // stopPollingDatabasePeriodically() and poll() to ensure that the latest currentStartPollingOrder is always
+ // visible. readLock should be used to avoid unexpected performance degradation of DruidCoordinator.
+ ReentrantReadWriteLock.ReadLock lock = startStopPollLock.readLock();
+ lock.lock();
+ try {
+ return currentStartPollingOrder >= 0;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void stopPollingDatabasePeriodically()
{
- ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
lock.lock();
try {
- if (!isStarted()) {
+ if (!isPollingDatabasePeriodically()) {
return;
}
- dataSourcesSnapshot = null;
- currentStartOrder = -1;
- exec.shutdownNow();
- exec = null;
+
+ periodicPollTaskFuture.cancel(false);
+ latestDatabasePoll = null;
+
+ // NOT nulling dataSourcesSnapshot, allowing to query the latest polled data even when this SegmentsMetadata
+ // object is stopped.
+
+ currentStartPollingOrder = -1;
}
finally {
lock.unlock();
}
}
- private Pair usedPayloadMapper(
- final int index,
- final ResultSet resultSet,
- final StatementContext context
- ) throws SQLException
+ private void awaitOrPerformDatabasePoll()
{
+ // Double-checked locking with awaitLatestDatabasePoll() call playing the role of the "check".
+ if (awaitLatestDatabasePoll()) {
+ return;
+ }
+ ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+ lock.lock();
try {
- return new Pair<>(
- jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class),
- resultSet.getBoolean("used")
- );
+ if (awaitLatestDatabasePoll()) {
+ return;
+ }
+ OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll();
+ this.latestDatabasePoll = newOnDemandUpdate;
+ doOnDemandPoll(newOnDemandUpdate);
}
- catch (IOException e) {
- throw new RuntimeException(e);
+ finally {
+ lock.unlock();
}
}
/**
- * Gets a list of all datasegments that overlap the provided interval along with thier used status.
+ * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is
+ * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise,
+ * meaning that a new on-demand database poll should be initiated.
*/
- private List> getDataSegmentsOverlappingInterval(
- final String dataSource,
- final Interval interval
- )
- {
- return connector.inReadOnlyTransaction(
- (handle, status) -> handle.createQuery(
- StringUtils.format(
- "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start",
- getSegmentsTable(),
- connector.getQuoteString()
- )
- )
- .setFetchSize(connector.getStreamingFetchSize())
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map(this::usedPayloadMapper)
- .list()
- );
- }
-
- private List> getDataSegments(
- final String dataSource,
- final Collection segmentIds,
- final Handle handle
- )
+ private boolean awaitLatestDatabasePoll()
{
- return segmentIds.stream().map(
- segmentId -> Optional.ofNullable(
- handle.createQuery(
- StringUtils.format(
- "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND id = :id",
- getSegmentsTable()
- )
- )
- .bind("dataSource", dataSource)
- .bind("id", segmentId)
- .map(this::usedPayloadMapper)
- .first()
- )
- .orElseThrow(() -> new UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]", segmentId)))
- )
- .collect(Collectors.toList());
+ DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
+ if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
+ Futures.getUnchecked(((PeriodicDatabasePoll) latestDatabasePoll).firstPollCompletionFuture);
+ return true;
+ }
+ if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
+ long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis());
+ OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll) latestDatabasePoll;
+ boolean latestUpdateIsFresh = latestOnDemandPoll.nanosElapsedFromInitiation() < periodicPollDelayNanos;
+ if (latestUpdateIsFresh) {
+ Futures.getUnchecked(latestOnDemandPoll.pollCompletionFuture);
+ return true;
+ }
+ // Latest on-demand update is not fresh. Fall through to return false from this method.
+ } else {
+ assert latestDatabasePoll == null;
+ // No periodic updates and no on-demand database poll have been done yet, nothing to await for.
+ }
+ return false;
}
- /**
- * Builds a VersionedIntervalTimeline containing used segments that overlap the intervals passed.
- */
- private VersionedIntervalTimeline buildVersionedIntervalTimeline(
- final String dataSource,
- final Collection intervals,
- final Handle handle
- )
+ private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll)
{
- return VersionedIntervalTimeline.forSegments(intervals
- .stream()
- .flatMap(interval -> handle.createQuery(
- StringUtils.format(
- "SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start AND used = true",
- getSegmentsTable(),
- connector.getQuoteString()
- )
- )
- .setFetchSize(connector.getStreamingFetchSize())
- .bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
- .map((i, resultSet, context) -> {
- try {
- return jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .list()
- .stream()
- )
- .iterator()
- );
+ try {
+ poll();
+ onDemandPoll.pollCompletionFuture.complete(null);
+ }
+ catch (Throwable t) {
+ onDemandPoll.pollCompletionFuture.completeExceptionally(t);
+ throw t;
+ }
}
@Override
- public boolean enableDataSource(final String dataSource)
+ public boolean markSegmentAsUsed(final String segmentId)
{
try {
- return enableSegments(dataSource, Intervals.ETERNITY) != 0;
+ int numUpdatedDatabaseEntries = connector.getDBI().withHandle(
+ (Handle handle) -> handle
+ .createStatement(StringUtils.format("UPDATE %s SET used=true WHERE id = :id", getSegmentsTable()))
+ .bind("id", segmentId)
+ .execute()
+ );
+ // Unlike bulk markAsUsed methods: markAsUsedAllNonOvershadowedSegmentsInDataSource(),
+ // markAsUsedNonOvershadowedSegmentsInInterval(), and markAsUsedNonOvershadowedSegments() we don't put the marked
+ // segment into the respective data source, because we don't have it fetched from the database. It's probably not
+ // worth complicating the implementation and making two database queries just to add the segment because it will
+ // be anyway fetched during the next poll(). Segment putting that is done in the bulk markAsUsed methods is a nice
+ // to have thing, but doesn't formally affects the external guarantees of SegmentsMetadata class.
+ return numUpdatedDatabaseEntries > 0;
}
- catch (Exception e) {
- log.error(e, "Exception enabling datasource %s", dataSource);
- return false;
+ catch (RuntimeException e) {
+ log.error(e, "Exception marking segment %s as used", segmentId);
+ throw e;
}
}
@Override
- public int enableSegments(final String dataSource, final Interval interval)
+ public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String dataSource)
{
- List> segments = getDataSegmentsOverlappingInterval(dataSource, interval);
- List segmentsToEnable = segments.stream()
- .filter(segment -> !segment.rhs && interval.contains(segment.lhs.getInterval()))
- .map(segment -> segment.lhs)
- .collect(Collectors.toList());
-
- VersionedIntervalTimeline versionedIntervalTimeline = VersionedIntervalTimeline.forSegments(
- segments.stream().filter(segment -> segment.rhs).map(segment -> segment.lhs).iterator()
- );
- VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, segmentsToEnable.iterator());
-
- return enableSegments(
- segmentsToEnable,
- versionedIntervalTimeline
- );
+ return doMarkAsUsedNonOvershadowedSegments(dataSource, null);
}
@Override
- public int enableSegments(final String dataSource, final Collection segmentIds)
+ public int markAsUsedNonOvershadowedSegmentsInInterval(final String dataSource, final Interval interval)
{
- Pair, VersionedIntervalTimeline> data = connector.inReadOnlyTransaction(
- (handle, status) -> {
- List segments = getDataSegments(dataSource, segmentIds, handle)
- .stream()
- .filter(pair -> !pair.rhs)
- .map(pair -> pair.lhs)
- .collect(Collectors.toList());
-
- VersionedIntervalTimeline versionedIntervalTimeline = buildVersionedIntervalTimeline(
- dataSource,
- JodaUtils.condenseIntervals(segments.stream().map(segment -> segment.getInterval()).collect(Collectors.toList())),
- handle
- );
- VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, segments.iterator());
+ Preconditions.checkNotNull(interval);
+ return doMarkAsUsedNonOvershadowedSegments(dataSource, interval);
+ }
- return new Pair<>(
- segments,
- versionedIntervalTimeline
- );
+ /**
+ * Implementation for both {@link #markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is null)
+ * and {@link #markAsUsedNonOvershadowedSegmentsInInterval}.
+ */
+ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval)
+ {
+ List usedSegmentsOverlappingInterval = new ArrayList<>();
+ List unusedSegmentsInInterval = new ArrayList<>();
+ connector.inReadOnlyTransaction(
+ (handle, status) -> {
+ String queryString =
+ StringUtils.format("SELECT used, payload FROM %1$s WHERE dataSource = :dataSource", getSegmentsTable());
+ if (interval != null) {
+ queryString += StringUtils.format(" AND start < :end AND %1$send%1$s > :start", connector.getQuoteString());
+ }
+ Query> query = handle
+ .createQuery(queryString)
+ .setFetchSize(connector.getStreamingFetchSize())
+ .bind("dataSource", dataSourceName);
+ if (interval != null) {
+ query = query
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString());
+ }
+ query = query
+ .map((int index, ResultSet resultSet, StatementContext context) -> {
+ try {
+ DataSegment segment = jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
+ if (resultSet.getBoolean("used")) {
+ usedSegmentsOverlappingInterval.add(segment);
+ } else {
+ if (interval == null || interval.contains(segment.getInterval())) {
+ unusedSegmentsInInterval.add(segment);
+ }
+ }
+ return null;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // Consume the query results to ensure usedSegmentsOverlappingInterval and unusedSegmentsInInterval are
+ // populated.
+ consume(query.iterator());
+ return null;
}
);
- return enableSegments(
- data.lhs,
- data.rhs
+ VersionedIntervalTimeline versionedIntervalTimeline = VersionedIntervalTimeline.forSegments(
+ Iterators.concat(usedSegmentsOverlappingInterval.iterator(), unusedSegmentsInInterval.iterator())
);
+
+ return markNonOvershadowedSegmentsAsUsed(unusedSegmentsInInterval, versionedIntervalTimeline);
}
- private int enableSegments(
- final Collection segments,
- final VersionedIntervalTimeline versionedIntervalTimeline
+ private static void consume(Iterator> iterator)
+ {
+ while (iterator.hasNext()) {
+ iterator.next();
+ }
+ }
+
+ private int markNonOvershadowedSegmentsAsUsed(
+ List unusedSegments,
+ VersionedIntervalTimeline timeline
)
{
- if (segments.isEmpty()) {
- log.warn("No segments found to update!");
- return 0;
+ List segmentIdsToMarkAsUsed = new ArrayList<>();
+ for (DataSegment segment : unusedSegments) {
+ if (timeline.isOvershadowed(segment.getInterval(), segment.getVersion())) {
+ continue;
+ }
+ segmentIdsToMarkAsUsed.add(segment.getId().toString());
}
- return connector.getDBI().withHandle(handle -> {
- Batch batch = handle.createBatch();
- segments
- .stream()
- .map(segment -> segment.getId())
- .filter(segmentId -> !versionedIntervalTimeline.isOvershadowed(
- segmentId.getInterval(),
- segmentId.getVersion()
- ))
- .forEach(segmentId -> batch.add(
- StringUtils.format(
- "UPDATE %s SET used=true WHERE id = '%s'",
- getSegmentsTable(),
- segmentId
- )
- ));
- return batch.execute().length;
- });
+ return markSegmentsAsUsed(segmentIdsToMarkAsUsed);
}
@Override
- public boolean enableSegment(final String segmentId)
+ public int markAsUsedNonOvershadowedSegments(final String dataSource, final Set segmentIds)
+ throws UnknownSegmentIdException
{
try {
- connector.getDBI().withHandle(
- new HandleCallback()
- {
- @Override
- public Void withHandle(Handle handle)
- {
- handle.createStatement(StringUtils.format("UPDATE %s SET used=true WHERE id = :id", getSegmentsTable()))
- .bind("id", segmentId)
- .execute();
- return null;
- }
- }
- );
+ Pair, VersionedIntervalTimeline> unusedSegmentsAndTimeline = connector
+ .inReadOnlyTransaction(
+ (handle, status) -> {
+ List unusedSegments = retrieveUnusedSegments(dataSource, segmentIds, handle);
+ List unusedSegmentsIntervals = JodaUtils.condenseIntervals(
+ unusedSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+ );
+ Iterator usedSegmentsOverlappingUnusedSegmentsIntervals =
+ retrieveUsedSegmentsOverlappingIntervals(dataSource, unusedSegmentsIntervals, handle);
+ VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(
+ Iterators.concat(usedSegmentsOverlappingUnusedSegmentsIntervals, unusedSegments.iterator())
+ );
+ return new Pair<>(unusedSegments, timeline);
+ }
+ );
+
+ List unusedSegments = unusedSegmentsAndTimeline.lhs;
+ VersionedIntervalTimeline timeline = unusedSegmentsAndTimeline.rhs;
+ return markNonOvershadowedSegmentsAsUsed(unusedSegments, timeline);
}
catch (Exception e) {
- log.error(e, "Exception enabling segment %s", segmentId);
- return false;
+ Throwable rootCause = Throwables.getRootCause(e);
+ if (rootCause instanceof UnknownSegmentIdException) {
+ throw (UnknownSegmentIdException) rootCause;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private List retrieveUnusedSegments(
+ final String dataSource,
+ final Set segmentIds,
+ final Handle handle
+ ) throws UnknownSegmentIdException
+ {
+ List unknownSegmentIds = new ArrayList<>();
+ List segments = segmentIds
+ .stream()
+ .map(
+ segmentId -> {
+ Iterator segmentResultIterator = handle
+ .createQuery(
+ StringUtils.format(
+ "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND id = :id",
+ getSegmentsTable()
+ )
+ )
+ .bind("dataSource", dataSource)
+ .bind("id", segmentId)
+ .map((int index, ResultSet resultSet, StatementContext context) -> {
+ try {
+ if (!resultSet.getBoolean("used")) {
+ return jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
+ } else {
+ // We emit nulls for used segments. They are filtered out below in this method.
+ return null;
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .iterator();
+ if (!segmentResultIterator.hasNext()) {
+ unknownSegmentIds.add(segmentId);
+ return null;
+ } else {
+ @Nullable DataSegment segment = segmentResultIterator.next();
+ if (segmentResultIterator.hasNext()) {
+ log.error(
+ "There is more than one row corresponding to segment id [%s] in data source [%s] in the database",
+ segmentId,
+ dataSource
+ );
+ }
+ return segment;
+ }
+ }
+ )
+ .filter(Objects::nonNull) // Filter nulls corresponding to used segments.
+ .collect(Collectors.toList());
+ if (!unknownSegmentIds.isEmpty()) {
+ throw new UnknownSegmentIdException(unknownSegmentIds);
+ }
+ return segments;
+ }
+
+ private Iterator retrieveUsedSegmentsOverlappingIntervals(
+ final String dataSource,
+ final Collection intervals,
+ final Handle handle
+ )
+ {
+ return intervals
+ .stream()
+ .flatMap(interval -> {
+ Iterable segmentResultIterable = () -> handle
+ .createQuery(
+ StringUtils.format(
+ "SELECT payload FROM %1$s "
+ + "WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start AND used = true",
+ getSegmentsTable(),
+ connector.getQuoteString()
+ )
+ )
+ .setFetchSize(connector.getStreamingFetchSize())
+ .bind("dataSource", dataSource)
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString())
+ .map((int index, ResultSet resultSet, StatementContext context) -> {
+ try {
+ return jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .iterator();
+ return StreamSupport.stream(segmentResultIterable.spliterator(), false);
+ })
+ .iterator();
+ }
+
+ private int markSegmentsAsUsed(final List segmentIds)
+ {
+ if (segmentIds.isEmpty()) {
+ log.info("No segments found to update!");
+ return 0;
}
- return true;
+ return connector.getDBI().withHandle(handle -> {
+ Batch batch = handle.createBatch();
+ segmentIds.forEach(segmentId -> batch.add(
+ StringUtils.format("UPDATE %s SET used=true WHERE id = '%s'", getSegmentsTable(), segmentId)
+ ));
+ int[] segmentChanges = batch.execute();
+ return computeNumChangedSegments(segmentIds, segmentChanges);
+ });
}
@Override
- public boolean removeDataSource(final String dataSource)
+ public int markAsUnusedAllSegmentsInDataSource(final String dataSource)
{
try {
- final int removed = connector.getDBI().withHandle(
- handle -> handle.createStatement(
- StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable())
- ).bind("dataSource", dataSource).execute()
+ final int numUpdatedDatabaseEntries = connector.getDBI().withHandle(
+ (Handle handle) -> handle
+ .createStatement(
+ StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable())
+ )
+ .bind("dataSource", dataSource)
+ .execute()
);
- if (removed == 0) {
- return false;
- }
+ return numUpdatedDatabaseEntries;
}
- catch (Exception e) {
- log.error(e, "Error removing datasource %s", dataSource);
- return false;
+ catch (RuntimeException e) {
+ log.error(e, "Exception marking all segments as unused in data source [%s]", dataSource);
+ throw e;
}
-
- return true;
}
/**
- * This method does not update {@code dataSourcesSnapshot}, see the comments in {@code doPoll()} about
- * snapshot update. The segment removal will be reflected after next poll cyccle runs.
+ * This method does not update {@link #dataSourcesSnapshot}, see the comments in {@link #doPoll()} about
+ * snapshot update. The update of the segment's state will be reflected after the next {@link DatabasePoll}.
*/
@Override
- public boolean removeSegment(String segmentId)
+ public boolean markSegmentAsUnused(final String segmentId)
{
try {
- return removeSegmentFromTable(segmentId);
+ return markSegmentAsUnusedInDatabase(segmentId);
}
- catch (Exception e) {
- log.error(e, e.toString());
- return false;
+ catch (RuntimeException e) {
+ log.error(e, "Exception marking segment [%s] as unused", segmentId);
+ throw e;
}
}
@Override
- public long disableSegments(String dataSource, Collection segmentIds)
+ public int markSegmentsAsUnused(String dataSourceName, Set segmentIds)
{
if (segmentIds.isEmpty()) {
return 0;
}
- final long[] result = new long[1];
+ final List segmentIdList = new ArrayList<>(segmentIds);
try {
- connector.getDBI().withHandle(handle -> {
+ return connector.getDBI().withHandle(handle -> {
Batch batch = handle.createBatch();
- segmentIds
- .forEach(segmentId -> batch.add(
- StringUtils.format(
- "UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s' ",
- getSegmentsTable(),
- dataSource,
- segmentId
- )
- ));
- final int[] resultArr = batch.execute();
- result[0] = Arrays.stream(resultArr).filter(x -> x > 0).count();
- return result[0];
+ segmentIdList.forEach(segmentId -> batch.add(
+ StringUtils.format(
+ "UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s'",
+ getSegmentsTable(),
+ dataSourceName,
+ segmentId
+ )
+ ));
+ final int[] segmentChanges = batch.execute();
+ return computeNumChangedSegments(segmentIdList, segmentChanges);
});
}
catch (Exception e) {
throw new RuntimeException(e);
}
- return result[0];
}
@Override
- public int disableSegments(String dataSource, Interval interval)
+ public int markAsUnusedSegmentsInInterval(String dataSourceName, Interval interval)
{
try {
- return connector.getDBI().withHandle(
+ Integer numUpdatedDatabaseEntries = connector.getDBI().withHandle(
handle -> handle
.createStatement(
StringUtils
@@ -524,116 +779,121 @@ public int disableSegments(String dataSource, Interval interval)
getSegmentsTable(),
connector.getQuoteString()
))
- .bind("datasource", dataSource)
+ .bind("datasource", dataSourceName)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.execute()
);
+ return numUpdatedDatabaseEntries;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
- private boolean removeSegmentFromTable(String segmentId)
+ private boolean markSegmentAsUnusedInDatabase(String segmentId)
{
- final int removed = connector.getDBI().withHandle(
+ final int numUpdatedRows = connector.getDBI().withHandle(
handle -> handle
.createStatement(StringUtils.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable()))
.bind("segmentID", segmentId)
.execute()
);
- return removed > 0;
+ if (numUpdatedRows < 0) {
+ log.assertionError(
+ "Negative number of rows updated for segment id [%s]: %d",
+ segmentId,
+ numUpdatedRows
+ );
+ } else if (numUpdatedRows > 1) {
+ log.error(
+ "More than one row updated for segment id [%s]: %d, "
+ + "there may be more than one row for the segment id in the database",
+ segmentId,
+ numUpdatedRows
+ );
+ }
+ return numUpdatedRows > 0;
}
- @Override
- public boolean isStarted()
+ private static int computeNumChangedSegments(List segmentIds, int[] segmentChanges)
{
- // isStarted() is synchronized together with start(), stop() and poll() to ensure that the latest currentStartOrder
- // is always visible. readLock should be used to avoid unexpected performance degradation of DruidCoordinator.
- ReentrantReadWriteLock.ReadLock lock = startStopLock.readLock();
- lock.lock();
- try {
- return currentStartOrder >= 0;
- }
- finally {
- lock.unlock();
+ int numChangedSegments = 0;
+ for (int i = 0; i < segmentChanges.length; i++) {
+ int numUpdatedRows = segmentChanges[i];
+ if (numUpdatedRows < 0) {
+ log.assertionError(
+ "Negative number of rows updated for segment id [%s]: %d",
+ segmentIds.get(i),
+ numUpdatedRows
+ );
+ } else if (numUpdatedRows > 1) {
+ log.error(
+ "More than one row updated for segment id [%s]: %d, "
+ + "there may be more than one row for the segment id in the database",
+ segmentIds.get(i),
+ numUpdatedRows
+ );
+ }
+ if (numUpdatedRows > 0) {
+ numChangedSegments += 1;
+ }
}
+ return numChangedSegments;
}
@Override
- @Nullable
- public ImmutableDruidDataSource getDataSource(String dataSourceName)
+ public @Nullable ImmutableDruidDataSource getImmutableDataSourceWithUsedSegments(String dataSourceName)
{
- final ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
- .map(m -> m.getDataSourcesMap().get(dataSourceName))
- .orElse(null);
- return dataSource == null ? null : dataSource;
+ return getSnapshotOfDataSourcesWithAllUsedSegments().getDataSource(dataSourceName);
}
@Override
- @Nullable
- public Collection getDataSources()
+ public Collection getImmutableDataSourcesWithAllUsedSegments()
{
- return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getDataSources()).orElse(null);
+ return getSnapshotOfDataSourcesWithAllUsedSegments().getDataSourcesWithAllUsedSegments();
}
@Override
- @Nullable
- public Iterable iterateAllSegments()
+ public Set getOvershadowedSegments()
{
- final Collection dataSources = Optional.ofNullable(dataSourcesSnapshot)
- .map(m -> m.getDataSources())
- .orElse(null);
- if (dataSources == null) {
- return null;
- }
-
- return () -> dataSources.stream()
- .flatMap(dataSource -> dataSource.getSegments().stream())
- .iterator();
+ return getSnapshotOfDataSourcesWithAllUsedSegments().getOvershadowedSegments();
}
@Override
- @Nullable
- public Set getOvershadowedSegments()
+ public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments()
{
- return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getOvershadowedSegments()).orElse(null);
+ awaitOrPerformDatabasePoll();
+ return dataSourcesSnapshot;
}
- @Nullable
@Override
- public DataSourcesSnapshot getDataSourcesSnapshot()
+ public Iterable iterateAllUsedSegments()
{
- return dataSourcesSnapshot;
+ awaitOrPerformDatabasePoll();
+ return () -> dataSourcesSnapshot
+ .getDataSourcesWithAllUsedSegments()
+ .stream()
+ .flatMap(dataSource -> dataSource.getSegments().stream())
+ .iterator();
}
@Override
- public Collection getAllDataSourceNames()
+ public Collection retrieveAllDataSourceNames()
{
return connector.getDBI().withHandle(
- handle -> handle.createQuery(
- StringUtils.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable())
- )
- .fold(
- new ArrayList<>(),
- new Folder3, Map>()
- {
- @Override
- public List fold(
- List druidDataSources,
- Map stringObjectMap,
- FoldController foldController,
- StatementContext statementContext
- )
- {
- druidDataSources.add(
- MapUtils.getString(stringObjectMap, "datasource")
- );
- return druidDataSources;
- }
- }
- )
+ handle -> handle
+ .createQuery(StringUtils.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable()))
+ .fold(
+ new ArrayList<>(),
+ (List druidDataSources,
+ Map stringObjectMap,
+ FoldController foldController,
+ StatementContext statementContext) -> {
+ druidDataSources.add(MapUtils.getString(stringObjectMap, "datasource"));
+ return druidDataSources;
+ }
+ )
);
}
@@ -642,15 +902,12 @@ public void poll()
{
// See the comment to the pollLock field, explaining this synchronized block
synchronized (pollLock) {
- try {
- doPoll();
- }
- catch (Exception e) {
- log.makeAlert(e, "Problem polling DB.").emit();
- }
+ doPoll();
}
}
+ /** This method is extracted from {@link #poll()} solely to reduce code nesting. */
+ @GuardedBy("pollLock")
private void doPoll()
{
log.debug("Starting polling of segment table");
@@ -681,6 +938,8 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
+ // If one entry in database is corrupted doPoll() should continue to work overall. See
+ // filter by `Objects::nonNull` below in this method.
return null;
}
}
@@ -692,54 +951,49 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
);
if (segments == null || segments.isEmpty()) {
- log.warn("No segments found in the database!");
+ log.info("No segments found in the database!");
return;
}
log.info("Polled and found %,d segments in the database", segments.size());
- ConcurrentHashMap newDataSources = new ConcurrentHashMap<>();
+ ImmutableMap dataSourceProperties = createDefaultDataSourceProperties();
- ImmutableMap dataSourceProperties = ImmutableMap.of("created", DateTimes.nowUtc().toString());
- segments
- .stream()
- .filter(Objects::nonNull)
- .forEach(segment -> {
- newDataSources
- .computeIfAbsent(segment.getDataSource(), dsName -> new DruidDataSource(dsName, dataSourceProperties))
- .addSegmentIfAbsent(segment);
- });
-
- // dataSourcesSnapshot is updated only here, please note that if datasources or segments are enabled or disabled
- // outside of poll, the dataSourcesSnapshot can become invalid until the next poll cycle.
+ // dataSourcesSnapshot is updated only here and the DataSourcesSnapshot object is immutable. If data sources or
+ // segments are marked as used or unused directly (via markAs...() methods in MetadataSegmentManager), the
+ // dataSourcesSnapshot can become invalid until the next database poll.
// DataSourcesSnapshot computes the overshadowed segments, which makes it an expensive operation if the
- // snapshot is invalidated on each segment removal, especially if a user issues a lot of single segment remove
- // calls in rapid succession. So the snapshot update is not done outside of poll at this time.
- // Updates outside of poll(), were primarily for the user experience, so users would immediately see the effect of
- // a segment remove call reflected in MetadataResource API calls. These updates outside of scheduled poll may be
- // added back in removeDataSource and removeSegment methods after the on-demand polling changes from
- // https://github.com/apache/incubator-druid/pull/7653 are in.
- final Map updatedDataSources = CollectionUtils.mapValues(
- newDataSources,
- v -> v.toImmutableDruidDataSource()
+ // snapshot was invalidated on each segment mark as unused or used, especially if a user issues a lot of single
+ // segment mark calls in rapid succession. So the snapshot update is not done outside of database poll at this time.
+ // Updates outside of database polls were primarily for the user experience, so users would immediately see the
+ // effect of a segment mark call reflected in MetadataResource API calls.
+ dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(
+ Iterables.filter(segments, Objects::nonNull), // Filter corrupted entries (see above in this method).
+ dataSourceProperties
);
- dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources);
+ }
+
+ private static ImmutableMap createDefaultDataSourceProperties()
+ {
+ return ImmutableMap.of("created", DateTimes.nowUtc().toString());
}
/**
* For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough
* (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all
- * existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use already
- * existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link
+ * existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use
+ * already existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link
* com.google.common.collect.Interner}, aiming to make the majority of {@link DataSegment} objects garbage soon after
* they are deserialized and to die in young generation. It allows to avoid fragmentation of the old generation and
* full GCs.
*/
private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment)
{
- ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
- .map(m -> m.getDataSourcesMap().get(segment.getDataSource()))
- .orElse(null);
+ @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = this.dataSourcesSnapshot;
+ if (dataSourcesSnapshot == null) {
+ return segment;
+ }
+ @Nullable ImmutableDruidDataSource dataSource = dataSourcesSnapshot.getDataSource(segment.getDataSource());
if (dataSource == null) {
return segment;
}
@@ -753,11 +1007,7 @@ private String getSegmentsTable()
}
@Override
- public List getUnusedSegmentIntervals(
- final String dataSource,
- final Interval interval,
- final int limit
- )
+ public List getUnusedSegmentIntervals(final String dataSource, final DateTime maxEndTime, final int limit)
{
return connector.inReadOnlyTransaction(
new TransactionCallback>()
@@ -768,7 +1018,8 @@ public List inTransaction(Handle handle, TransactionStatus status)
Iterator iter = handle
.createQuery(
StringUtils.format(
- "SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource and start >= :start and %2$send%2$s <= :end and used = false ORDER BY start, %2$send%2$s",
+ "SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource AND "
+ + "%2$send%2$s <= :end AND used = false ORDER BY start, %2$send%2$s",
getSegmentsTable(),
connector.getQuoteString()
)
@@ -776,8 +1027,7 @@ public List inTransaction(Handle handle, TransactionStatus status)
.setFetchSize(connector.getStreamingFetchSize())
.setMaxRows(limit)
.bind("dataSource", dataSource)
- .bind("start", interval.getStart().toString())
- .bind("end", interval.getEnd().toString())
+ .bind("end", maxEndTime.toString())
.map(
new BaseResultSetMapper()
{
diff --git a/server/src/main/java/org/apache/druid/metadata/UnknownSegmentIdException.java b/server/src/main/java/org/apache/druid/metadata/UnknownSegmentIdException.java
index cca37b9318fa..6362077a2010 100644
--- a/server/src/main/java/org/apache/druid/metadata/UnknownSegmentIdException.java
+++ b/server/src/main/java/org/apache/druid/metadata/UnknownSegmentIdException.java
@@ -19,13 +19,23 @@
package org.apache.druid.metadata;
+import java.util.Collection;
+
/**
- * Exception thrown by MetadataSegmentManager when an segment id is unknown.
+ * Exception thrown by {@link MetadataSegmentManager} when a segment id is unknown.
*/
-public class UnknownSegmentIdException extends RuntimeException
+public class UnknownSegmentIdException extends Exception
{
- public UnknownSegmentIdException(String message)
+ private final Collection unknownSegmentIds;
+
+ UnknownSegmentIdException(Collection segmentIds)
+ {
+ super("Cannot find segment ids " + segmentIds);
+ this.unknownSegmentIds = segmentIds;
+ }
+
+ public Collection getUnknownSegmentIds()
{
- super(message);
+ return unknownSegmentIds;
}
}
diff --git a/server/src/main/java/org/apache/druid/server/JettyUtils.java b/server/src/main/java/org/apache/druid/server/JettyUtils.java
index 9374fb07d931..717eb136b43a 100644
--- a/server/src/main/java/org/apache/druid/server/JettyUtils.java
+++ b/server/src/main/java/org/apache/druid/server/JettyUtils.java
@@ -19,10 +19,16 @@
package org.apache.druid.server;
+import org.apache.druid.java.util.common.logger.Logger;
+
import javax.annotation.Nullable;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.UriInfo;
public class JettyUtils
{
+ private static final Logger log = new Logger(JettyUtils.class);
+
/**
* Concatenate URI parts, in a way that is useful for proxy servlets.
*
@@ -46,4 +52,38 @@ public static String concatenateForRewrite(
return url.toString();
}
+
+ /**
+ * Returns the value of the query parameter of the given name. If not found, but there is a value corresponding to
+ * the parameter of the given compatiblityName it is returned instead and a warning is logged suggestion to make
+ * queries using the new parameter name.
+ *
+ * This method is useful for renaming query parameters (from name to compatiblityName) while preserving backward
+ * compatibility of the REST API.
+ */
+ @Nullable
+ public static String getQueryParam(UriInfo uriInfo, String name, String compatiblityName)
+ {
+ MultivaluedMap queryParameters = uriInfo.getQueryParameters();
+ // Returning the first value, according to the @QueryParam spec:
+ // https://docs.oracle.com/javaee/7/api/javax/ws/rs/QueryParam.html:
+ // "If the type is not one of the collection types listed in 5 above and the query parameter is represented by
+ // multiple values then the first value (lexically) of the parameter is used."
+ String paramValue = queryParameters.getFirst(name);
+ if (paramValue != null) {
+ return paramValue;
+ }
+ String compatibilityParamValue = queryParameters.getFirst(compatiblityName);
+ if (compatibilityParamValue != null) {
+ log.warn(
+ "Parameter %s in %s query has been renamed to %s. Use the new parameter name.",
+ compatiblityName,
+ uriInfo.getPath(),
+ name
+ );
+ return compatibilityParamValue;
+ }
+ // Not found neither name nor compatiblityName
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index 14bf3395add3..ada41d22bb88 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -48,7 +48,7 @@ public class CoordinatorDynamicConfig
{
public static final String CONFIG_KEY = "coordinator.config";
- private final long millisToWaitBeforeDeleting;
+ private final long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
private final long mergeBytesLimit;
private final int mergeSegmentsLimit;
private final int maxSegmentsToMove;
@@ -56,13 +56,29 @@ public class CoordinatorDynamicConfig
private final int replicationThrottleLimit;
private final int balancerComputeThreads;
private final boolean emitBalancingStats;
- private final boolean killAllDataSources;
- private final Set killableDataSources;
+
+ /**
+ * If true, {@link org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller} sends kill tasks for
+ * unused segments in all data sources.
+ */
+ private final boolean killUnusedSegmentsInAllDataSources;
+
+ /**
+ * List of specific data sources for which kill tasks are sent in {@link
+ * org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller}.
+ */
+ private final Set specificDataSourcesToKillUnusedSegmentsIn;
private final Set decommissioningNodes;
private final int decommissioningMaxPercentOfMaxSegmentsToMove;
- // The pending segments of the dataSources in this list are not killed.
- private final Set protectedPendingSegmentDatasources;
+ /**
+ * Stale pending segments belonging to the data sources in this list are not killed by {@link
+ * DruidCoordinatorCleanupPendingSegments}. In other words, segments in these data sources are "protected".
+ *
+ * Pending segments are considered "stale" when their created_time is older than {@link
+ * DruidCoordinatorCleanupPendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now.
+ */
+ private final Set dataSourcesToNotKillStalePendingSegmentsIn;
/**
* The maximum number of segments that could be queued for loading to any given server.
@@ -74,7 +90,10 @@ public class CoordinatorDynamicConfig
@JsonCreator
public CoordinatorDynamicConfig(
- @JsonProperty("millisToWaitBeforeDeleting") long millisToWaitBeforeDeleting,
+ // Keeping the legacy 'millisToWaitBeforeDeleting' property name for backward compatibility. When the project is
+ // updated to Jackson 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
+ @JsonProperty("millisToWaitBeforeDeleting")
+ long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
@JsonProperty("mergeBytesLimit") long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
@@ -82,19 +101,26 @@ public CoordinatorDynamicConfig(
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
@JsonProperty("emitBalancingStats") boolean emitBalancingStats,
-
- // Type is Object here so that we can support both string and list as
- // coordinator console can not send array of strings in the update request.
- // See https://github.com/apache/incubator-druid/issues/3055
- @JsonProperty("killDataSourceWhitelist") Object killableDataSources,
- @JsonProperty("killAllDataSources") boolean killAllDataSources,
- @JsonProperty("killPendingSegmentsSkipList") Object protectedPendingSegmentDatasources,
+ // Type is Object here so that we can support both string and list as Coordinator console can not send array of
+ // strings in the update request. See https://github.com/apache/incubator-druid/issues/3055.
+ // Keeping the legacy 'killDataSourceWhitelist' property name for backward compatibility. When the project is
+ // updated to Jackson 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
+ @JsonProperty("killDataSourceWhitelist") Object specificDataSourcesToKillUnusedSegmentsIn,
+ // Keeping the legacy 'killAllDataSources' property name for backward compatibility. When the project is
+ // updated to Jackson 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
+ @JsonProperty("killAllDataSources") boolean killUnusedSegmentsInAllDataSources,
+ // Type is Object here so that we can support both string and list as Coordinator console can not send array of
+ // strings in the update request, as well as for specificDataSourcesToKillUnusedSegmentsIn.
+ // Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is
+ // updated to Jackson 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
+ @JsonProperty("killPendingSegmentsSkipList") Object dataSourcesToNotKillStalePendingSegmentsIn,
@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
@JsonProperty("decommissioningNodes") Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove
)
{
- this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
+ this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
@@ -102,9 +128,10 @@ public CoordinatorDynamicConfig(
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
this.emitBalancingStats = emitBalancingStats;
- this.killAllDataSources = killAllDataSources;
- this.killableDataSources = parseJsonStringOrArray(killableDataSources);
- this.protectedPendingSegmentDatasources = parseJsonStringOrArray(protectedPendingSegmentDatasources);
+ this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources;
+ this.specificDataSourcesToKillUnusedSegmentsIn = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
+ this.dataSourcesToNotKillStalePendingSegmentsIn =
+ parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.decommissioningNodes = parseJsonStringOrArray(decommissioningNodes);
Preconditions.checkArgument(
@@ -113,8 +140,10 @@ public CoordinatorDynamicConfig(
);
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;
- if (this.killAllDataSources && !this.killableDataSources.isEmpty()) {
- throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
+ if (this.killUnusedSegmentsInAllDataSources && !this.specificDataSourcesToKillUnusedSegmentsIn.isEmpty()) {
+ throw new IAE(
+ "can't have killUnusedSegmentsInAllDataSources and non-empty specificDataSourcesToKillUnusedSegmentsIn"
+ );
}
}
@@ -152,10 +181,10 @@ public static CoordinatorDynamicConfig current(final JacksonConfigManager config
return Preconditions.checkNotNull(watch(configManager).get(), "Got null config from watcher?!");
}
- @JsonProperty
- public long getMillisToWaitBeforeDeleting()
+ @JsonProperty("millisToWaitBeforeDeleting")
+ public long getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
{
- return millisToWaitBeforeDeleting;
+ return leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
}
@JsonProperty
@@ -200,30 +229,22 @@ public int getBalancerComputeThreads()
return balancerComputeThreads;
}
- /**
- * List of dataSources for which kill tasks are sent in
- * {@link org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller}.
- */
@JsonProperty("killDataSourceWhitelist")
- public Set getKillableDataSources()
+ public Set getSpecificDataSourcesToKillUnusedSegmentsIn()
{
- return killableDataSources;
+ return specificDataSourcesToKillUnusedSegmentsIn;
}
- @JsonProperty
- public boolean isKillAllDataSources()
+ @JsonProperty("killAllDataSources")
+ public boolean isKillUnusedSegmentsInAllDataSources()
{
- return killAllDataSources;
+ return killUnusedSegmentsInAllDataSources;
}
- /**
- * List of dataSources for which pendingSegments are NOT cleaned up
- * in {@link DruidCoordinatorCleanupPendingSegments}.
- */
- @JsonProperty
- public Set getProtectedPendingSegmentDatasources()
+ @JsonProperty("killPendingSegmentsSkipList")
+ public Set getDataSourcesToNotKillStalePendingSegmentsIn()
{
- return protectedPendingSegmentDatasources;
+ return dataSourcesToNotKillStalePendingSegmentsIn;
}
@JsonProperty
@@ -233,9 +254,9 @@ public int getMaxSegmentsInNodeLoadingQueue()
}
/**
- * List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers,
- * and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by
- * {@link CoordinatorDynamicConfig#getDecommissioningMaxPercentOfMaxSegmentsToMove}.
+ * List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning'
+ * servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate
+ * specified by {@link CoordinatorDynamicConfig#getDecommissioningMaxPercentOfMaxSegmentsToMove}.
*
* @return list of host:port entries
*/
@@ -270,7 +291,8 @@ public int getDecommissioningMaxPercentOfMaxSegmentsToMove()
public String toString()
{
return "CoordinatorDynamicConfig{" +
- "millisToWaitBeforeDeleting=" + millisToWaitBeforeDeleting +
+ "leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments="
+ + leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments +
", mergeBytesLimit=" + mergeBytesLimit +
", mergeSegmentsLimit=" + mergeSegmentsLimit +
", maxSegmentsToMove=" + maxSegmentsToMove +
@@ -278,9 +300,9 @@ public String toString()
", replicationThrottleLimit=" + replicationThrottleLimit +
", balancerComputeThreads=" + balancerComputeThreads +
", emitBalancingStats=" + emitBalancingStats +
- ", killAllDataSources=" + killAllDataSources +
- ", killDataSourceWhitelist=" + killableDataSources +
- ", protectedPendingSegmentDatasources=" + protectedPendingSegmentDatasources +
+ ", killUnusedSegmentsInAllDataSources=" + killUnusedSegmentsInAllDataSources +
+ ", specificDataSourcesToKillUnusedSegmentsIn=" + specificDataSourcesToKillUnusedSegmentsIn +
+ ", dataSourcesToNotKillStalePendingSegmentsIn=" + dataSourcesToNotKillStalePendingSegmentsIn +
", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
", decommissioningNodes=" + decommissioningNodes +
", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove +
@@ -299,7 +321,8 @@ public boolean equals(Object o)
CoordinatorDynamicConfig that = (CoordinatorDynamicConfig) o;
- if (millisToWaitBeforeDeleting != that.millisToWaitBeforeDeleting) {
+ if (leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments !=
+ that.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments) {
return false;
}
if (mergeBytesLimit != that.mergeBytesLimit) {
@@ -323,16 +346,16 @@ public boolean equals(Object o)
if (emitBalancingStats != that.emitBalancingStats) {
return false;
}
- if (killAllDataSources != that.killAllDataSources) {
+ if (killUnusedSegmentsInAllDataSources != that.killUnusedSegmentsInAllDataSources) {
return false;
}
if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) {
return false;
}
- if (!Objects.equals(killableDataSources, that.killableDataSources)) {
+ if (!Objects.equals(specificDataSourcesToKillUnusedSegmentsIn, that.specificDataSourcesToKillUnusedSegmentsIn)) {
return false;
}
- if (!Objects.equals(protectedPendingSegmentDatasources, that.protectedPendingSegmentDatasources)) {
+ if (!Objects.equals(dataSourcesToNotKillStalePendingSegmentsIn, that.dataSourcesToNotKillStalePendingSegmentsIn)) {
return false;
}
if (!Objects.equals(decommissioningNodes, that.decommissioningNodes)) {
@@ -345,7 +368,7 @@ public boolean equals(Object o)
public int hashCode()
{
return Objects.hash(
- millisToWaitBeforeDeleting,
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
mergeBytesLimit,
mergeSegmentsLimit,
maxSegmentsToMove,
@@ -353,10 +376,10 @@ public int hashCode()
replicationThrottleLimit,
balancerComputeThreads,
emitBalancingStats,
- killAllDataSources,
+ killUnusedSegmentsInAllDataSources,
maxSegmentsInNodeLoadingQueue,
- killableDataSources,
- protectedPendingSegmentDatasources,
+ specificDataSourcesToKillUnusedSegmentsIn,
+ dataSourcesToNotKillStalePendingSegmentsIn,
decommissioningNodes,
decommissioningMaxPercentOfMaxSegmentsToMove
);
@@ -369,19 +392,20 @@ public static Builder builder()
public static class Builder
{
- private static final long DEFAULT_MILLIS_TO_WAIT_BEFORE_DELETING = TimeUnit.MINUTES.toMillis(15);
- private static final long DEFAULT_MERGE_BYTES_LIMIT = 524288000L;
+ private static final long DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS =
+ TimeUnit.MINUTES.toMillis(15);
+ private static final long DEFAULT_MERGE_BYTES_LIMIT = 524_288_000L;
private static final int DEFAULT_MERGE_SEGMENTS_LIMIT = 100;
private static final int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5;
private static final int DEFAULT_REPLICANT_LIFETIME = 15;
private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10;
private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
- private static final boolean DEFAULT_KILL_ALL_DATA_SOURCES = false;
+ private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
- private Long millisToWaitBeforeDeleting;
+ private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
private Long mergeBytesLimit;
private Integer mergeSegmentsLimit;
private Integer maxSegmentsToMove;
@@ -389,9 +413,9 @@ public static class Builder
private Integer replicationThrottleLimit;
private Boolean emitBalancingStats;
private Integer balancerComputeThreads;
- private Object killableDataSources;
- private Boolean killAllDataSources;
- private Object killPendingSegmentsSkipList;
+ private Object specificDataSourcesToKillUnusedSegmentsIn;
+ private Boolean killUnusedSegmentsInAllDataSources;
+ private Object dataSourcesToNotKillStalePendingSegmentsIn;
private Integer maxSegmentsInNodeLoadingQueue;
private Object decommissioningNodes;
private Integer decommissioningMaxPercentOfMaxSegmentsToMove;
@@ -402,7 +426,8 @@ public Builder()
@JsonCreator
public Builder(
- @JsonProperty("millisToWaitBeforeDeleting") @Nullable Long millisToWaitBeforeDeleting,
+ @JsonProperty("millisToWaitBeforeDeleting")
+ @Nullable Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
@JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove,
@@ -410,15 +435,17 @@ public Builder(
@JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads,
@JsonProperty("emitBalancingStats") @Nullable Boolean emitBalancingStats,
- @JsonProperty("killDataSourceWhitelist") @Nullable Object killableDataSources,
- @JsonProperty("killAllDataSources") @Nullable Boolean killAllDataSources,
- @JsonProperty("killPendingSegmentsSkipList") @Nullable Object killPendingSegmentsSkipList,
+ @JsonProperty("killDataSourceWhitelist") @Nullable Object specificDataSourcesToKillUnusedSegmentsIn,
+ @JsonProperty("killAllDataSources") @Nullable Boolean killUnusedSegmentsInAllDataSources,
+ @JsonProperty("killPendingSegmentsSkipList") @Nullable Object dataSourcesToNotKillStalePendingSegmentsIn,
@JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
@JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes,
- @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove
+ @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove")
+ @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove
)
{
- this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
+ this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
@@ -426,17 +453,17 @@ public Builder(
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = balancerComputeThreads;
this.emitBalancingStats = emitBalancingStats;
- this.killAllDataSources = killAllDataSources;
- this.killableDataSources = killableDataSources;
- this.killPendingSegmentsSkipList = killPendingSegmentsSkipList;
+ this.specificDataSourcesToKillUnusedSegmentsIn = specificDataSourcesToKillUnusedSegmentsIn;
+ this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources;
+ this.dataSourcesToNotKillStalePendingSegmentsIn = dataSourcesToNotKillStalePendingSegmentsIn;
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.decommissioningNodes = decommissioningNodes;
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;
}
- public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
+ public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis)
{
- this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
+ this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = leadingTimeMillis;
return this;
}
@@ -482,15 +509,15 @@ public Builder withEmitBalancingStats(boolean emitBalancingStats)
return this;
}
- public Builder withKillDataSourceWhitelist(Set killDataSourceWhitelist)
+ public Builder withSpecificDataSourcesToKillUnusedSegmentsIn(Set dataSources)
{
- this.killableDataSources = killDataSourceWhitelist;
+ this.specificDataSourcesToKillUnusedSegmentsIn = dataSources;
return this;
}
- public Builder withKillAllDataSources(boolean killAllDataSources)
+ public Builder withKillUnusedSegmentsInAllDataSources(boolean killUnusedSegmentsInAllDataSources)
{
- this.killAllDataSources = killAllDataSources;
+ this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources;
return this;
}
@@ -515,7 +542,9 @@ public Builder withDecommissioningMaxPercentOfMaxSegmentsToMove(Integer percent)
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
- millisToWaitBeforeDeleting == null ? DEFAULT_MILLIS_TO_WAIT_BEFORE_DELETING : millisToWaitBeforeDeleting,
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
+ ? DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS
+ : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : mergeBytesLimit,
mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : mergeSegmentsLimit,
maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove,
@@ -523,9 +552,11 @@ public CoordinatorDynamicConfig build()
replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads,
emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : emitBalancingStats,
- killableDataSources,
- killAllDataSources == null ? DEFAULT_KILL_ALL_DATA_SOURCES : killAllDataSources,
- killPendingSegmentsSkipList,
+ specificDataSourcesToKillUnusedSegmentsIn,
+ killUnusedSegmentsInAllDataSources == null
+ ? DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES
+ : killUnusedSegmentsInAllDataSources,
+ dataSourcesToNotKillStalePendingSegmentsIn,
maxSegmentsInNodeLoadingQueue == null
? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
: maxSegmentsInNodeLoadingQueue,
@@ -539,7 +570,9 @@ public CoordinatorDynamicConfig build()
public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
{
return new CoordinatorDynamicConfig(
- millisToWaitBeforeDeleting == null ? defaults.getMillisToWaitBeforeDeleting() : millisToWaitBeforeDeleting,
+ leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
+ ? defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
+ : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit,
mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit,
maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove,
@@ -547,11 +580,15 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads,
emitBalancingStats == null ? defaults.emitBalancingStats() : emitBalancingStats,
- killableDataSources == null ? defaults.getKillableDataSources() : killableDataSources,
- killAllDataSources == null ? defaults.isKillAllDataSources() : killAllDataSources,
- killPendingSegmentsSkipList == null
- ? defaults.getProtectedPendingSegmentDatasources()
- : killPendingSegmentsSkipList,
+ specificDataSourcesToKillUnusedSegmentsIn == null
+ ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
+ : specificDataSourcesToKillUnusedSegmentsIn,
+ killUnusedSegmentsInAllDataSources == null
+ ? defaults.isKillUnusedSegmentsInAllDataSources()
+ : killUnusedSegmentsInAllDataSources,
+ dataSourcesToNotKillStalePendingSegmentsIn == null
+ ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
+ : dataSourcesToNotKillStalePendingSegmentsIn,
maxSegmentsInNodeLoadingQueue == null
? defaults.getMaxSegmentsInNodeLoadingQueue()
: maxSegmentsInNodeLoadingQueue,
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
index d2d3029b17b3..6f2373b3f8e9 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
@@ -117,7 +117,7 @@ public static double intervalCost(double x1, double y0, double y1)
// since x_0 <= y_0, Y must overlap X if y_0 < x_1
if (y0 < x1) {
- /**
+ /*
* We have two possible cases of overlap:
*
* X = [ A )[ B )[ C ) or [ A )[ B )
@@ -151,7 +151,7 @@ public static double intervalCost(double x1, double y0, double y1)
intervalCost(beta, beta, gamma) + // cost(B, C)
2 * (beta + FastMath.exp(-beta) - 1); // cost(B, B)
} else {
- /**
+ /*
* In the case where there is no overlap:
*
* Given that x_0 <= y_0,
@@ -258,9 +258,14 @@ public double calculateInitialTotalCost(final List serverHolders)
{
double cost = 0;
for (ServerHolder server : serverHolders) {
- Iterable segments = server.getServer().getLazyAllSegments();
- for (DataSegment s : segments) {
- cost += computeJointSegmentsCost(s, segments);
+ // segments are dumped into an array because it's probably better than iterating the iterateAllSegments() result
+ // quadratically in a loop, which can generate garbage in the form of Stream, Spliterator, Iterator, etc. objects
+ // whose total memory volume exceeds the size of the DataSegment array.
+ DataSegment[] segments = server.getServer().iterateAllSegments().toArray(new DataSegment[0]);
+ for (DataSegment s1 : segments) {
+ for (DataSegment s2 : segments) {
+ cost += computeJointSegmentsCost(s1, s2);
+ }
}
}
return cost;
@@ -280,7 +285,7 @@ public double calculateNormalization(final List serverHolders)
{
double cost = 0;
for (ServerHolder server : serverHolders) {
- for (DataSegment segment : server.getServer().getLazyAllSegments()) {
+ for (DataSegment segment : server.getServer().iterateAllSegments()) {
cost += computeJointSegmentsCost(segment, segment);
}
}
@@ -288,10 +293,7 @@ public double calculateNormalization(final List serverHolders)
}
@Override
- public void emitStats(
- String tier,
- CoordinatorStats stats, List serverHolderList
- )
+ public void emitStats(String tier, CoordinatorStats stats, List serverHolderList)
{
final double initialTotalCost = calculateInitialTotalCost(serverHolderList);
final double normalization = calculateNormalization(serverHolderList);
@@ -334,7 +336,7 @@ protected double computeCost(
// the sum of the costs of other (exclusive of the proposalSegment) segments on the server
cost += computeJointSegmentsCost(
proposalSegment,
- Iterables.filter(server.getServer().getLazyAllSegments(), segment -> !proposalSegment.equals(segment))
+ Iterables.filter(server.getServer().iterateAllSegments(), segment -> !proposalSegment.equals(segment))
);
// plus the costs of segments that will be loaded
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java
index dff28710a006..8e559b4a2741 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java
@@ -47,8 +47,8 @@ protected double computeCost(
}
int nSegments = 1;
- if (server.getServer().getLazyAllSegments().size() > 0) {
- nSegments = server.getServer().getLazyAllSegments().size();
+ if (server.getServer().getNumSegments() > 0) {
+ nSegments = server.getServer().getNumSegments();
}
double normalizedCost = cost / nSegments;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
index 586f92b2dd79..2001b3fa4e7f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
@@ -22,11 +22,13 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -35,7 +37,6 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
/**
* Contains a representation of the current state of the cluster by tier.
@@ -43,6 +44,16 @@
*/
public class DruidCluster
{
+ /** This static factory method must be called only from inside DruidClusterBuilder in tests. */
+ @VisibleForTesting
+ static DruidCluster createDruidClusterFromBuilderInTest(
+ @Nullable Set realtimes,
+ Map> historicals
+ )
+ {
+ return new DruidCluster(realtimes, historicals);
+ }
+
private final Set