From 817a7b8582c9925a6648308129b3858b8a8ace81 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Fri, 19 Apr 2024 11:02:37 -0500 Subject: [PATCH 1/3] fixup-16284: Remove the LagMetric class --- .../indexing/kinesis/supervisor/KinesisSupervisor.java | 6 +++--- .../supervisor/autoscaler/LagBasedAutoScaler.java | 9 +-------- .../druid/indexing/overlord/supervisor/Supervisor.java | 8 ++++---- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index e1a7656f23cd..365a9135e3c5 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -39,7 +39,6 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; @@ -429,9 +428,10 @@ protected SeekableStreamDataSourceMetadata createDataSourceMetad } @Override - public LagMetric getLagMetricForAutoScaler() + public long computeLagForAutoScaler() { - return LagMetric.MAX; + LagStats lagStats = computeLagStats(); + return lagStats == null ? 0L : lagStats.getMaxLag(); } private SeekableStreamDataSourceMetadata createDataSourceMetadataWithClosedOrExpiredPartitions( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 7813725733b1..888a3c30c3e4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -21,7 +21,6 @@ import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; -import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.java.util.common.StringUtils; @@ -155,13 +154,7 @@ private Runnable computeAndCollectLag() LOCK.lock(); try { if (!spec.isSuspended()) { - LagStats lagStats = supervisor.computeLagStats(); - if (lagStats == null) { - lagMetricsQueue.offer(0L); - } else { - long lag = lagStats.get(supervisor.getLagMetricForAutoScaler()); - lagMetricsQueue.offer(lag > 0 ? lag : 0L); - } + lagMetricsQueue.offer(supervisor.computeLagForAutoScaler()); log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue); } else { log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 8befa2adae33..9b9511cbf3da 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.DataSourceMetadata; -import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.segment.incremental.ParseExceptionReport; @@ -95,11 +94,12 @@ default Boolean isHealthy() LagStats computeLagStats(); /** - * Used by AutoScaler to either scale by max/total/avg. + * Used by AutoScaler to make scaling decisions. */ - default LagMetric getLagMetricForAutoScaler() + default long computeLagForAutoScaler() { - return LagMetric.TOTAL; + LagStats lagStats = computeLagStats(); + return lagStats == null ? 0L : lagStats.getTotalLag(); } int getActiveTaskGroupsCount(); From 3caba9cf1fc86b846bd12763abf76a06498e1700 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Fri, 19 Apr 2024 13:13:19 -0500 Subject: [PATCH 2/3] commetns --- .../autoscaler/LagBasedAutoScaler.java | 3 +- .../supervisor/autoscaler/LagMetric.java | 27 ------------ .../supervisor/autoscaler/LagStats.java | 13 ------ .../overlord/supervisor/LagStatsTest.java | 42 ------------------- 4 files changed, 2 insertions(+), 83 deletions(-) delete mode 100644 server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java delete mode 100644 server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 888a3c30c3e4..f8618b06f74b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -154,7 +154,8 @@ private Runnable computeAndCollectLag() LOCK.lock(); try { if (!spec.isSuspended()) { - lagMetricsQueue.offer(supervisor.computeLagForAutoScaler()); + long lag = supervisor.computeLagForAutoScaler(); + lagMetricsQueue.offer(lag > 0 ? lag : 0L); log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue); } else { log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java deleted file mode 100644 index d3f00b5c2c83..000000000000 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.overlord.supervisor.autoscaler; - -public enum LagMetric -{ - TOTAL, - MAX, - AVERAGE; -} diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java index c7a6dfc61328..7b6e5fd0bab1 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java @@ -46,17 +46,4 @@ public long getAvgLag() { return avgLag; } - - public long get(LagMetric metric) - { - switch (metric) { - case AVERAGE: - return avgLag; - case TOTAL: - return totalLag; - case MAX: - return maxLag; - } - throw new IllegalStateException("Unknown Metric"); - } } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java deleted file mode 100644 index b51882538b9c..000000000000 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.overlord.supervisor; - -import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; -import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; -import org.junit.Assert; -import org.junit.Test; - -public class LagStatsTest -{ - - @Test - public void lagStatsByMetric() - { - int max = 1; - int avg = 2; - int total = 3; - LagStats lag = new LagStats(max, total, avg); - - Assert.assertEquals(max, lag.get(LagMetric.MAX)); - Assert.assertEquals(total, lag.get(LagMetric.TOTAL)); - Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE)); - } -} From 62416b9e4b906a20922cd94ab77cb775f45da7a5 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Fri, 19 Apr 2024 13:26:20 -0500 Subject: [PATCH 3/3] add test coverage --- .../overlord/supervisor/SupervisorTest.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java new file mode 100644 index 000000000000..79811079d341 --- /dev/null +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class SupervisorTest +{ + @Test + public void testAutoScalerLagComputation() + { + Supervisor supervisor = Mockito.spy(Supervisor.class); + + Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 3)); + Assert.assertEquals(2, supervisor.computeLagForAutoScaler()); + + Mockito.when(supervisor.computeLagStats()).thenReturn(null); + Assert.assertEquals(0, supervisor.computeLagForAutoScaler()); + } +}