Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`killTask/availableSlot/count`| Number of available task slots that can be used for auto kill tasks in the auto kill run. This is the max number of task slots minus any currently running auto kill tasks. | |Varies|
|`killTask/maxSlot/count`| Maximum number of task slots available for auto kill tasks in the auto kill run. | |Varies|
|`kill/task/count`| Number of tasks issued in the auto kill run. | |Varies|
|`kill/pendingSegments/count`|Number of stale pending segments deleted from the metadata store.|`dataSource`|Varies|
|`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
|`segment/waitCompact/count`|Total number of segments of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
|`interval/waitCompact/count`|Total number of intervals of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.druid.java.util.common;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import io.netty.util.SuppressForbidden;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand All @@ -30,6 +32,7 @@
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -194,6 +197,34 @@ public static boolean canCompareAsString(final DateTime dateTime)
&& ISOChronology.getInstanceUTC().equals(dateTime.getChronology());
}

/**
* Returns the earlier of the two given dates. When passed a null and a non-null
* date, this method simply returns the non-null value.
*/
public static DateTime earlierOf(DateTime a, DateTime b)
{
// Put nulls last to select the smaller non-null value
if (Objects.compare(a, b, Ordering.natural().nullsLast()) < 0) {
return a;
} else {
return b;
}
}

/**
* Returns the later of the two given dates. When passed a null and a non-null
* date, this method simply returns the non-null value.
*/
public static DateTime laterOf(DateTime a, DateTime b)
{
// Put nulls first to select the bigger non-null value
if (Objects.compare(a, b, Comparators.naturalNullsFirst()) > 0) {
return a;
} else {
return b;
}
}

private DateTimes()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,34 @@ public void testCanCompareAsString()
DateTimes.of("2000").withZone(DateTimes.inferTzFromString("America/Los_Angeles")))
);
}

@Test
public void testEarlierOf()
{
Assert.assertNull(DateTimes.earlierOf(null, null));

final DateTime jan14 = DateTimes.of("2013-01-14");
Assert.assertEquals(jan14, DateTimes.earlierOf(null, jan14));
Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, null));
Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan14));

final DateTime jan15 = DateTimes.of("2013-01-15");
Assert.assertEquals(jan14, DateTimes.earlierOf(jan15, jan14));
Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan15));
}

@Test
public void testLaterOf()
{
Assert.assertNull(DateTimes.laterOf(null, null));

final DateTime jan14 = DateTimes.of("2013-01-14");
Assert.assertEquals(jan14, DateTimes.laterOf(null, jan14));
Assert.assertEquals(jan14, DateTimes.laterOf(jan14, null));
Assert.assertEquals(jan14, DateTimes.laterOf(jan14, jan14));

final DateTime jan15 = DateTimes.of("2013-01-15");
Assert.assertEquals(jan15, DateTimes.laterOf(jan15, jan14));
Assert.assertEquals(jan15, DateTimes.laterOf(jan14, jan15));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.utils.JvmUtils;

Expand Down Expand Up @@ -77,19 +76,13 @@ public class CoordinatorDynamicConfig
private final Map<Dimension, String> validDebugDimensions;

/**
* Stale pending segments belonging to the data sources in this list are not killed by {@link
* Stale pending segments belonging to the data sources in this list are not killed by {@code
* KillStalePendingSegments}. In other words, segments in these data sources are "protected".
* <p>
* Pending segments are considered "stale" when their created_time is older than {@link
* KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now.
*/
private final Set<String> dataSourcesToNotKillStalePendingSegmentsIn;

/**
* The maximum number of segments that can be queued for loading to any given server.
*
* @see LoadQueuePeon
* @see org.apache.druid.server.coordinator.rules.LoadRule#run
*/
private final int maxSegmentsInNodeLoadingQueue;
private final boolean pauseCoordination;
Expand Down Expand Up @@ -576,6 +569,12 @@ public Builder withSpecificDataSourcesToKillUnusedSegmentsIn(Set<String> dataSou
return this;
}

public Builder withDatasourcesToNotKillPendingSegmentsIn(Set<String> datasources)
{
this.dataSourcesToNotKillStalePendingSegmentsIn = datasources;
return this;
}

public Builder withKillTaskSlotRatio(Double killTaskSlotRatio)
{
this.killTaskSlotRatio = killTaskSlotRatio;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.duty;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Duty to kill stale pending segments which are not needed anymore. Pending segments
* are created when appending realtime or batch tasks allocate segments to build
* incremental indexes. Under normal operation, these pending segments get committed
* when the task completes and become regular segments. But in case of task failures,
* some pending segments might be left around and cause clutter in the metadata store.
* <p>
* While cleaning up, this duty ensures that the following pending segments are
* retained for at least {@link #DURATION_TO_RETAIN}:
* <ul>
* <li>Pending segments created by any active task (across all datasources)</li>
* <li>Pending segments created by the latest completed task (across all datasources)</li>
* </ul>
*/
public class KillStalePendingSegments implements CoordinatorDuty
{
private static final Logger log = new Logger(KillStalePendingSegments.class);
private static final Period DURATION_TO_RETAIN = new Period("P1D");

private final OverlordClient overlordClient;

@Inject
public KillStalePendingSegments(OverlordClient overlordClient)
{
this.overlordClient = overlordClient;
}

@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final Set<String> killDatasources = new HashSet<>(
params.getUsedSegmentsTimelinesPerDataSource().keySet()
);
killDatasources.removeAll(
params.getCoordinatorDynamicConfig()
.getDataSourcesToNotKillStalePendingSegmentsIn()
);

final DateTime minCreatedTime = getMinCreatedTimeToRetain();
for (String dataSource : killDatasources) {
int pendingSegmentsKilled = FutureUtils.getUnchecked(
overlordClient.killPendingSegments(
dataSource,
new Interval(DateTimes.MIN, minCreatedTime)
),
true
);
if (pendingSegmentsKilled > 0) {
log.info(
"Killed [%d] pendingSegments created before [%s] for datasource[%s].",
pendingSegmentsKilled, minCreatedTime, dataSource
);
params.getCoordinatorStats().add(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add the stats to emit metrics even when the number of pending segments killed is 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just avoiding zero value metric emissions for now, as this is a new metric.

Stats.Kill.PENDING_SEGMENTS,
RowKey.of(Dimension.DATASOURCE, dataSource),
pendingSegmentsKilled
);
}
}
return params;
}

/**
* Computes the minimum created time of retainable pending segments. Any pending
* segment created before this time is considered stale and can be safely deleted.
* The limit is determined to ensure that pending segments created by any active
* task and the latest completed task (across all datasources) are retained for
* at least {@link #DURATION_TO_RETAIN}.
*/
private DateTime getMinCreatedTimeToRetain()
{
// Fetch the statuses of all active tasks and the latest completed task
// (The Overlord API returns complete tasks in descending order of created_date.)
final List<TaskStatusPlus> statuses = ImmutableList.copyOf(
FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 1), true)
);

DateTime earliestActiveTaskStart = DateTimes.nowUtc();
DateTime latestCompletedTaskStart = null;
for (TaskStatusPlus status : statuses) {
if (status.getStatusCode() == null) {
// Unknown status
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a change in logic here as the created time was processed in the earlier code:
.filter(status -> status.getStatusCode() == null || !status.getStatusCode().isComplete())

Copy link
Copy Markdown
Contributor Author

@kfaraz kfaraz Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for catching this! This is why we should always have some unit tests.

Copy link
Copy Markdown
Contributor Author

@kfaraz kfaraz Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made the changes but is it okay with you if I include it in a separate PR? I don't want to trigger the whole CI pipeline for this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm ok with it.

Copy link
Copy Markdown
Contributor Author

@kfaraz kfaraz Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #14891 #14961

} else if (status.getStatusCode().isComplete()) {
latestCompletedTaskStart = DateTimes.laterOf(
latestCompletedTaskStart,
status.getCreatedTime()
);
} else {
earliestActiveTaskStart = DateTimes.earlierOf(
earliestActiveTaskStart,
status.getCreatedTime()
);
}
}

return DateTimes.earlierOf(latestCompletedTaskStart, earliestActiveTaskStart)
.minus(DURATION_TO_RETAIN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public static class Kill
= CoordinatorStat.toDebugAndEmit("killMaxSlots", "killTask/maxSlot/count");
public static final CoordinatorStat SUBMITTED_TASKS
= CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count");
public static final CoordinatorStat PENDING_SEGMENTS
= CoordinatorStat.toDebugAndEmit("killPendingSegs", "kill/pendingSegments/count");
}

public static class Balancer
Expand Down
Loading