From 878fa705e609a653a34528336c3f0075e2c5f49b Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 15 Oct 2024 15:41:13 -0500 Subject: [PATCH 1/5] superviosr/autoscaler: Fix clearing of collected lags on skipped scale actions --- .../supervisor/SeekableStreamSupervisor.java | 13 ++++++++++--- .../supervisor/autoscaler/LagBasedAutoScaler.java | 10 +++++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1da5b4fbb9cc..05e5a1678b65 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -424,11 +424,13 @@ private class DynamicAllocationTasksNotice implements Notice { Callable scaleAction; ServiceEmitter emitter; + Runnable actionAfterScale; private static final String TYPE = "dynamic_allocation_tasks_notice"; - DynamicAllocationTasksNotice(Callable scaleAction, ServiceEmitter emitter) + DynamicAllocationTasksNotice(Callable scaleAction, Runnable actionAfterScale, ServiceEmitter emitter) { this.scaleAction = scaleAction; + this.actionAfterScale = actionAfterScale; this.emitter = emitter; } @@ -500,6 +502,7 @@ public void handle() boolean allocationSuccess = changeTaskCount(desiredTaskCount); if (allocationSuccess) { + actionAfterScale.run(); dynamicTriggerLastRunTime = nowTime; } } @@ -1260,9 +1263,13 @@ public void tryInit() } } - public Runnable buildDynamicAllocationTask(Callable scaleAction, ServiceEmitter emitter) + public Runnable buildDynamicAllocationTask( + Callable scaleAction, + Runnable actionAfterScale, + ServiceEmitter emitter + ) { - return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter)); + return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, actionAfterScale, emitter)); } private Runnable buildRunTask() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index ec81c5f9f99b..eef60af927ef 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -86,10 +86,6 @@ public void start() int desiredTaskCount = -1; try { desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue)); - - if (desiredTaskCount != -1) { - lagMetricsQueue.clear(); - } } catch (Exception ex) { log.warn(ex, "Exception while computing desired task count for [%s]", dataSource); @@ -100,6 +96,10 @@ public void start() return desiredTaskCount; }; + Runnable actionAfterScale = () -> { + lagMetricsQueue.clear(); + }; + lagComputationExec.scheduleAtFixedRate( computeAndCollectLag(), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up @@ -107,7 +107,7 @@ public void start() TimeUnit.MILLISECONDS ); allocationExec.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask(scaleAction, emitter), + supervisor.buildDynamicAllocationTask(scaleAction, actionAfterScale, emitter), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig .getLagCollectionRangeMillis(), lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), From f949f8de50805022ff3e1c0a60ac53b3cf624575 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 15 Oct 2024 17:46:37 -0500 Subject: [PATCH 2/5] comments --- .../supervisor/SeekableStreamSupervisor.java | 22 +++++++++++-------- .../autoscaler/LagBasedAutoScaler.java | 15 ++++++++++--- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 05e5a1678b65..86c4ba385bdd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -422,15 +422,19 @@ public boolean equals(Object obj) // change taskCount without resubmitting. private class DynamicAllocationTasksNotice implements Notice { - Callable scaleAction; + Callable computeDesiredTaskCount; ServiceEmitter emitter; - Runnable actionAfterScale; + Runnable onSuccessfulScale; private static final String TYPE = "dynamic_allocation_tasks_notice"; - DynamicAllocationTasksNotice(Callable scaleAction, Runnable actionAfterScale, ServiceEmitter emitter) + DynamicAllocationTasksNotice( + Callable computeDesiredTaskCount, + Runnable onSuccessfulScale, + ServiceEmitter emitter + ) { - this.scaleAction = scaleAction; - this.actionAfterScale = actionAfterScale; + this.computeDesiredTaskCount = computeDesiredTaskCount; + this.onSuccessfulScale = onSuccessfulScale; this.emitter = emitter; } @@ -472,7 +476,7 @@ public void handle() return; } } - final Integer desiredTaskCount = scaleAction.call(); + final Integer desiredTaskCount = computeDesiredTaskCount.call(); ServiceMetricEvent.Builder event = ServiceMetricEvent.builder() .setDimension(DruidMetrics.DATASOURCE, dataSource) .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()); @@ -502,7 +506,7 @@ public void handle() boolean allocationSuccess = changeTaskCount(desiredTaskCount); if (allocationSuccess) { - actionAfterScale.run(); + onSuccessfulScale.run(); dynamicTriggerLastRunTime = nowTime; } } @@ -1265,11 +1269,11 @@ public void tryInit() public Runnable buildDynamicAllocationTask( Callable scaleAction, - Runnable actionAfterScale, + Runnable onSuccessfulScale, ServiceEmitter emitter ) { - return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, actionAfterScale, emitter)); + return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter)); } private Runnable buildRunTask() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index eef60af927ef..a69c0a288ce5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -96,8 +96,17 @@ public void start() return desiredTaskCount; }; - Runnable actionAfterScale = () -> { - lagMetricsQueue.clear(); + Runnable onSuccessfulScale = () -> { + LOCK.lock(); + try { + lagMetricsQueue.clear(); + } + catch (Exception ex) { + log.warn(ex, "Exception while clearing lags for [%s]", dataSource); + } + finally { + LOCK.unlock(); + } }; lagComputationExec.scheduleAtFixedRate( @@ -107,7 +116,7 @@ public void start() TimeUnit.MILLISECONDS ); allocationExec.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask(scaleAction, actionAfterScale, emitter), + supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig .getLagCollectionRangeMillis(), lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), From 40313bc4120184e23bb0e70916217d6d8978c744 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam <35785271+adithyachakilam@users.noreply.github.com> Date: Tue, 15 Oct 2024 16:12:53 -0500 Subject: [PATCH 3/5] supervisor/autoscaler: Skip scaling when partitions are less than minTaskCount (#17335) --- .../autoscaler/LagBasedAutoScaler.java | 16 +++--- .../SeekableStreamSupervisorSpecTest.java | 52 +++++++++++++++++++ 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index a69c0a288ce5..648d8a655e92 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -225,17 +225,16 @@ private int computeDesiredTaskCount(List lags) int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount(); int desiredActiveTaskCount; + int partitionCount = supervisor.getPartitionCount(); + if (partitionCount <= 0) { + log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource); + return -1; + } if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) { // Do Scale out int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); - int partitionCount = supervisor.getPartitionCount(); - if (partitionCount <= 0) { - log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource); - return -1; - } - int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); if (currentActiveTaskCount == actualTaskCountMax) { log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].", @@ -257,7 +256,8 @@ private int computeDesiredTaskCount(List lags) if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) { // Do Scale in int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); - if (currentActiveTaskCount == lagBasedAutoScalerConfig.getTaskCountMin()) { + int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); + if (currentActiveTaskCount == actualTaskCountMin) { log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].", dataSource ); @@ -269,7 +269,7 @@ private int computeDesiredTaskCount(List lags) .setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount)); return -1; } else { - desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin()); + desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin); } return desiredActiveTaskCount; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index baff5fc765b2..3281360f5806 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -992,6 +992,58 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce autoScaler.stop(); } + @Test + public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanPartitions() throws InterruptedException + { + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); + Map modifiedScaleInProps = getScaleInProperties(); + + modifiedScaleInProps.put("taskCountMax", 20); + modifiedScaleInProps.put("taskCountMin", 15); + + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + modifiedScaleInProps, + LagBasedAutoScalerConfig.class + ), + spec, + emitter + ); + + // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. + Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); + supervisor.getIoConfig().setTaskCount(2); + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + + Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount()); + Thread.sleep(2000); + Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount()); + + autoScaler.reset(); + autoScaler.stop(); + } + @Test public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws InterruptedException { From 9d86763490d700a35afad150e46dced5644bdce5 Mon Sep 17 00:00:00 2001 From: Pranav Date: Tue, 15 Oct 2024 17:50:18 -0700 Subject: [PATCH 4/5] Fix pip installation after ubuntu upgrade (#17358) --- .github/scripts/setup_generate_license.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/scripts/setup_generate_license.sh b/.github/scripts/setup_generate_license.sh index 71583bfb2b26..85a6a5aaa7af 100755 --- a/.github/scripts/setup_generate_license.sh +++ b/.github/scripts/setup_generate_license.sh @@ -18,6 +18,9 @@ set -e sudo apt-get update && sudo apt-get install python3 -y -curl https://bootstrap.pypa.io/pip/3.5/get-pip.py | sudo -H python3 +# creating python virtual env +python3 -m venv ~/.python3venv +source ~/.python3venv/bin/activate +sudo apt install python3-pip pip3 install wheel # install wheel first explicitly pip3 install --upgrade pyyaml From afddb22dfa5a81632496e6f54764e9e501ae9d6b Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 16 Oct 2024 13:44:27 -0500 Subject: [PATCH 5/5] fix tests --- .../indexing/kinesis/supervisor/KinesisSupervisorTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 24d919918f47..fe851a183a26 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -358,6 +358,7 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception Thread.sleep(1 * 1000); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScale); + autoscaler.stop(); } @Test @@ -435,6 +436,7 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception Thread.sleep(1 * 1000); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountAfterScale); + autoscaler.stop(); } @Test