From f18edb4619219dbe0dfe209e5fd575adcf835f3c Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 25 Mar 2024 13:56:24 -0500 Subject: [PATCH 1/6] Visibility into skipped scale notices --- docs/operations/metrics.md | 2 ++ .../supervisor/SeekableStreamSupervisor.java | 31 ++++++++++++++----- .../SeekableStreamSupervisorSpec.java | 2 +- .../autoscaler/AutoScalerConfig.java | 3 +- .../autoscaler/LagBasedAutoScaler.java | 13 ++++++-- .../autoscaler/LagBasedAutoScalerConfig.java | 5 +-- .../SeekableStreamSupervisorSpecTest.java | 28 ++++++++++++----- 7 files changed, 62 insertions(+), 22 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 0c7a492d807a..dec640640337 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -258,6 +258,8 @@ batch ingestion emit the following metrics. These metrics are deltas for each em |`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s | |`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds| |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.| +|`ingest/scale/up/skip`|Count of times LagBasedAutoScaler wanted to scale up but skipped due to `minTriggerScaleActionFrequencyMillis`|`dataSource`, `stream`, `tags`|Depends on auto scaler config.| +|`ingest/scale/up/skip`|Count of times LagBasedAutoScaler wanted to scale down but skipped due to `minTriggerScaleActionFrequencyMillis`|`dataSource`, `stream`, `tags`|Depends on auto scaler config.| If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 507001a7af6d..24e049b2d63f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -403,11 +403,18 @@ public boolean equals(Object obj) private class DynamicAllocationTasksNotice implements Notice { Callable scaleAction; + ServiceEmitter emitter; + ServiceMetricEvent.Builder event; private static final String TYPE = "dynamic_allocation_tasks_notice"; - DynamicAllocationTasksNotice(Callable scaleAction) + DynamicAllocationTasksNotice(Callable scaleAction, ServiceEmitter emitter) { this.scaleAction = scaleAction; + this.emitter = emitter; + this.event = new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) + .setDimensionIfNotNull(DruidMetrics.TAGS, spec.getContextValue(DruidMetrics.TAGS)); } /** @@ -448,6 +455,7 @@ public void handle() return; } } + final Integer desriedTaskCount = scaleAction.call(); if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) { log.info( "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!", @@ -455,9 +463,15 @@ public void handle() autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource ); + + int currentActiveTasks = currentActiveTaskCount(); + if (desriedTaskCount > currentActiveTasks) { + emitter.emit(event.setMetric("ingest/scale/up/skip", 1)); + } else if (desriedTaskCount < currentActiveTasks && desriedTaskCount > 0) { + emitter.emit(event.setMetric("ingest/scale/down/skip", 1)); + } return; } - final Integer desriedTaskCount = scaleAction.call(); boolean allocationSuccess = changeTaskCount(desriedTaskCount); if (allocationSuccess) { dynamicTriggerLastRunTime = nowTime; @@ -493,9 +507,7 @@ public String getType() private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException { - int currentActiveTaskCount; - Collection activeTaskGroups = activelyReadingTaskGroups.values(); - currentActiveTaskCount = activeTaskGroups.size(); + int currentActiveTaskCount = currentActiveTaskCount(); if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) { return false; @@ -514,6 +526,11 @@ private boolean changeTaskCount(int desiredActiveTaskCount) } } + private int currentActiveTaskCount() + { + return activelyReadingTaskGroups.values().size(); + } + private void changeTaskCountInIOConfig(int desiredActiveTaskCount) { ioConfig.setTaskCount(desiredActiveTaskCount); @@ -1208,9 +1225,9 @@ public void tryInit() } } - public Runnable buildDynamicAllocationTask(Callable scaleAction) + public Runnable buildDynamicAllocationTask(Callable scaleAction, ServiceEmitter emitter) { - return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction)); + return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter)); } private Runnable buildRunTask() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index b7d4ba2f2772..7b5f46195e7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -167,7 +167,7 @@ public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) { AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig(); if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) { - return autoScalerConfig.createAutoScaler(supervisor, this); + return autoScalerConfig.createAutoScaler(supervisor, this, emitter); } return new NoopTaskAutoScaler(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java index 53174a17bbac..750e77328f61 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; @UnstableApi @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class) @@ -38,6 +39,6 @@ public interface AutoScalerConfig long getMinTriggerScaleActionFrequencyMillis(); int getTaskCountMax(); int getTaskCountMin(); - SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec); + SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index a06f358435f8..1cb8f38e299b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import java.util.ArrayList; import java.util.List; @@ -45,11 +46,16 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler private final SupervisorSpec spec; private final SeekableStreamSupervisor supervisor; private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig; + private final ServiceEmitter emitter; private static final ReentrantLock LOCK = new ReentrantLock(true); - public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource, - LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec + public LagBasedAutoScaler( + SeekableStreamSupervisor supervisor, + String dataSource, + LagBasedAutoScalerConfig autoScalerConfig, + SupervisorSpec spec, + ServiceEmitter emitter ) { this.lagBasedAutoScalerConfig = autoScalerConfig; @@ -62,6 +68,7 @@ public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Computation-%d"); this.spec = spec; this.supervisor = supervisor; + this.emitter = emitter; } @Override @@ -93,7 +100,7 @@ public void start() TimeUnit.MILLISECONDS ); allocationExec.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask(scaleAction), + supervisor.buildDynamicAllocationTask(scaleAction, emitter), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig .getLagCollectionRangeMillis(), lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java index 0a4eb0b585e6..e03242de279a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import javax.annotation.Nullable; @@ -154,9 +155,9 @@ public int getTaskCountMin() } @Override - public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec) + public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter) { - return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec); + return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec, emitter); } @JsonProperty diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 12eb3c7e10ab..ecc86b641230 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -56,7 +56,9 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.TestHelper; @@ -129,6 +131,8 @@ public void setUp() monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class); supervisorStateManagerConfig = EasyMock.mock(SupervisorStateManagerConfig.class); supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class); + + EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(null).anyTimes(); } private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor @@ -750,16 +754,22 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); EasyMock.replay(taskMaster); - TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3); + ServiceEmitter dynamicActionEmitter = EasyMock.mock(ServiceEmitter.class); + dynamicActionEmitter.emit(EasyMock.anyObject(ServiceEventBuilder.class)); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.replay(dynamicActionEmitter); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( supervisor, DATASOURCE, mapper.convertValue( - getScaleOutProperties(2), + getScaleOutProperties(10), LagBasedAutoScalerConfig.class ), - spec + spec, + dynamicActionEmitter ); supervisor.start(); autoScaler.start(); @@ -769,7 +779,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); - + EasyMock.verify(dynamicActionEmitter); autoScaler.reset(); autoScaler.stop(); } @@ -807,7 +817,8 @@ public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor() thro getScaleOutProperties(2), LagBasedAutoScalerConfig.class ), - spec + spec, + emitter ); supervisor.start(); autoScaler.start(); @@ -851,7 +862,8 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t getScaleOutProperties(3), LagBasedAutoScalerConfig.class ), - spec + spec, + emitter ); supervisor.start(); autoScaler.start(); @@ -870,7 +882,6 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException { EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); - EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes(); EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); @@ -895,7 +906,8 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce getScaleInProperties(), LagBasedAutoScalerConfig.class ), - spec + spec, + emitter ); // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. From dcd8b77302c03e6d208c006f5679a7c353e879cb Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 25 Mar 2024 20:52:17 -0500 Subject: [PATCH 2/6] comments --- docs/operations/metrics.md | 4 ++-- .../supervisor/SeekableStreamSupervisor.java | 4 ++-- .../SeekableStreamSupervisorSpecTest.java | 12 ++++++++++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index dec640640337..25b69910c301 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -258,8 +258,8 @@ batch ingestion emit the following metrics. These metrics are deltas for each em |`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s | |`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds| |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.| -|`ingest/scale/up/skip`|Count of times LagBasedAutoScaler wanted to scale up but skipped due to `minTriggerScaleActionFrequencyMillis`|`dataSource`, `stream`, `tags`|Depends on auto scaler config.| -|`ingest/scale/up/skip`|Count of times LagBasedAutoScaler wanted to scale down but skipped due to `minTriggerScaleActionFrequencyMillis`|`dataSource`, `stream`, `tags`|Depends on auto scaler config.| +|`ingest/skipScaleUp/count`|Total number of times `lagBased` auto scaler attempted to scale up but was skipped due to `minTriggerScaleActionFrequencyMillis`.|`dataSource`, `stream`, `tags`|Depends on auto scaler config.| +|`ingest/skipScaleDown/count`|Total number of times `lagBased` auto scaler attempted to scale down but skipped due to `minTriggerScaleActionFrequencyMillis`.|`dataSource`, `stream`, `tags`|Depends on auto scaler config.| If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 24e049b2d63f..09dd15f32472 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -466,9 +466,9 @@ public void handle() int currentActiveTasks = currentActiveTaskCount(); if (desriedTaskCount > currentActiveTasks) { - emitter.emit(event.setMetric("ingest/scale/up/skip", 1)); + emitter.emit(event.setMetric("ingest/skipScaleUp/count", 1)); } else if (desriedTaskCount < currentActiveTasks && desriedTaskCount > 0) { - emitter.emit(event.setMetric("ingest/scale/down/skip", 1)); + emitter.emit(event.setMetric("ingest/skipScaleDown/count", 1)); } return; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index ecc86b641230..2bf0de93fbb3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -56,7 +56,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -65,6 +65,8 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.easymock.Capture; +import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.joda.time.DateTime; @@ -754,8 +756,9 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); EasyMock.replay(taskMaster); + Capture metricCapture = Capture.newInstance(CaptureType.ALL); ServiceEmitter dynamicActionEmitter = EasyMock.mock(ServiceEmitter.class); - dynamicActionEmitter.emit(EasyMock.anyObject(ServiceEventBuilder.class)); + dynamicActionEmitter.emit(EasyMock.capture(metricCapture)); EasyMock.expectLastCall().atLeastOnce(); EasyMock.replay(dynamicActionEmitter); @@ -779,6 +782,11 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); + Assert.assertTrue(metricCapture.getValues() + .stream() + .map(metric -> metric.build(ImmutableMap.of())) + .map(ServiceMetricEvent::getMetric) + .anyMatch("ingest/skipScaleUp/count"::equals)); EasyMock.verify(dynamicActionEmitter); autoScaler.reset(); autoScaler.stop(); From daa5c757d92ac63a496f5a097cfeace2639bf578 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 26 Mar 2024 13:59:57 -0500 Subject: [PATCH 3/6] change to emit always instead of just skips --- docs/operations/metrics.md | 3 +- .../supervisor/SeekableStreamSupervisor.java | 40 ++++++++++--------- .../autoscaler/LagBasedAutoScaler.java | 18 +++++++++ .../SeekableStreamSupervisorSpecTest.java | 12 +++--- 4 files changed, 48 insertions(+), 25 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 25b69910c301..396c085902a4 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -258,8 +258,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em |`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s | |`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds| |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.| -|`ingest/skipScaleUp/count`|Total number of times `lagBased` auto scaler attempted to scale up but was skipped due to `minTriggerScaleActionFrequencyMillis`.|`dataSource`, `stream`, `tags`|Depends on auto scaler config.| -|`ingest/skipScaleDown/count`|Total number of times `lagBased` auto scaler attempted to scale down but skipped due to `minTriggerScaleActionFrequencyMillis`.|`dataSource`, `stream`, `tags`|Depends on auto scaler config.| +|`ingest/autoScaler/lagBased/requiredTasks`|Count of required tasks based on the calculations of `lagBased` auto scaler.|`dataSource`, `stream`, `skipReason`|Depends on auto scaler config.| If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 09dd15f32472..5129f03f3949 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -77,6 +77,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScaler; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.IAE; @@ -404,17 +405,12 @@ private class DynamicAllocationTasksNotice implements Notice { Callable scaleAction; ServiceEmitter emitter; - ServiceMetricEvent.Builder event; private static final String TYPE = "dynamic_allocation_tasks_notice"; DynamicAllocationTasksNotice(Callable scaleAction, ServiceEmitter emitter) { this.scaleAction = scaleAction; this.emitter = emitter; - this.event = new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) - .setDimensionIfNotNull(DruidMetrics.TAGS, spec.getContextValue(DruidMetrics.TAGS)); } /** @@ -456,22 +452,33 @@ public void handle() } } final Integer desriedTaskCount = scaleAction.call(); + ServiceMetricEvent.Builder event = ServiceMetricEvent + .builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()); if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) { log.info( - "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!", + "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it! desried task count is [%s], active task count is [%s]", nowTime - dynamicTriggerLastRunTime, autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), - dataSource + dataSource, + desriedTaskCount, + getActiveTaskGroupsCount() ); - int currentActiveTasks = currentActiveTaskCount(); - if (desriedTaskCount > currentActiveTasks) { - emitter.emit(event.setMetric("ingest/skipScaleUp/count", 1)); - } else if (desriedTaskCount < currentActiveTasks && desriedTaskCount > 0) { - emitter.emit(event.setMetric("ingest/skipScaleDown/count", 1)); + if (desriedTaskCount > 0) { + emitter.emit( + event + .setDimension(LagBasedAutoScaler.SKIP_REASON_DIMENSION, "MIN_TRIGGER_SCALE_ACTION_FREQUENCY") + .setMetric(LagBasedAutoScaler.REQUIRED_TASKS_METRIC, desriedTaskCount)); } return; } + + if (desriedTaskCount > 0) { + emitter.emit(event.setMetric(LagBasedAutoScaler.REQUIRED_TASKS_METRIC, desriedTaskCount)); + } + boolean allocationSuccess = changeTaskCount(desriedTaskCount); if (allocationSuccess) { dynamicTriggerLastRunTime = nowTime; @@ -507,7 +514,9 @@ public String getType() private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException { - int currentActiveTaskCount = currentActiveTaskCount(); + int currentActiveTaskCount; + Collection activeTaskGroups = activelyReadingTaskGroups.values(); + currentActiveTaskCount = activeTaskGroups.size(); if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) { return false; @@ -526,11 +535,6 @@ private boolean changeTaskCount(int desiredActiveTaskCount) } } - private int currentActiveTaskCount() - { - return activelyReadingTaskGroups.values().size(); - } - private void changeTaskCountInIOConfig(int desiredActiveTaskCount) { ioConfig.setTaskCount(desiredActiveTaskCount); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 1cb8f38e299b..fc59c1bc9e01 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -28,6 +28,8 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; import java.util.ArrayList; import java.util.List; @@ -38,6 +40,8 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler { + public static final String SKIP_REASON_DIMENSION = "skipReason"; + public static final String REQUIRED_TASKS_METRIC = "ingest/autoScaler/lagBased/requiredTasks"; private static final EmittingLogger log = new EmittingLogger(LagBasedAutoScaler.class); private final String dataSource; private final CircularFifoQueue lagMetricsQueue; @@ -47,6 +51,7 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler private final SeekableStreamSupervisor supervisor; private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig; private final ServiceEmitter emitter; + private final ServiceMetricEvent.Builder metricbuilder; private static final ReentrantLock LOCK = new ReentrantLock(true); @@ -69,6 +74,9 @@ public LagBasedAutoScaler( this.spec = spec; this.supervisor = supervisor; this.emitter = emitter; + metricbuilder = ServiceMetricEvent.builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, this.supervisor.getIoConfig().getStream()); } @Override @@ -221,6 +229,11 @@ private int computeDesiredTaskCount(List lags) log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].", dataSource ); + emitter.emit( + metricbuilder + .setDimension(SKIP_REASON_DIMENSION, "AT_MAX") + .setMetric(REQUIRED_TASKS_METRIC, taskCount) + ); return -1; } else { desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax); @@ -235,6 +248,11 @@ private int computeDesiredTaskCount(List lags) log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].", dataSource ); + emitter.emit( + metricbuilder + .setDimension(SKIP_REASON_DIMENSION, "AT_MIN") + .setMetric(REQUIRED_TASKS_METRIC, taskCount) + ); return -1; } else { desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 2bf0de93fbb3..6f2ccd25ff35 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -782,11 +782,13 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); - Assert.assertTrue(metricCapture.getValues() - .stream() - .map(metric -> metric.build(ImmutableMap.of())) - .map(ServiceMetricEvent::getMetric) - .anyMatch("ingest/skipScaleUp/count"::equals)); + Assert.assertTrue( + metricCapture + .getValues() + .stream() + .map(metric -> metric.build(ImmutableMap.of())) + .map(ServiceMetricEvent::getMetric) + .anyMatch(LagBasedAutoScaler.REQUIRED_TASKS_METRIC::equals)); EasyMock.verify(dynamicActionEmitter); autoScaler.reset(); autoScaler.stop(); From 6e035cf2d5eddc5f0a928ec9d86e31f8e72c77bc Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 26 Mar 2024 17:26:16 -0500 Subject: [PATCH 4/6] fix failing test --- .../seekablestream/SeekableStreamSupervisorSpecTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 6f2ccd25ff35..287daa165398 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -58,7 +58,6 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.TestHelper; @@ -133,8 +132,6 @@ public void setUp() monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class); supervisorStateManagerConfig = EasyMock.mock(SupervisorStateManagerConfig.class); supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class); - - EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(null).anyTimes(); } private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor @@ -624,9 +621,11 @@ public void testAutoScalerCreated() EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) .andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)) .anyTimes(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes(); EasyMock.replay(seekableStreamSupervisorIOConfig); EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes(); + EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); EasyMock.replay(supervisor4); TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec( @@ -697,8 +696,10 @@ public void testDefaultAutoScalerConfigCreatedWithDefault() "1" ), AutoScalerConfig.class)) .anyTimes(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes(); EasyMock.replay(seekableStreamSupervisorIOConfig); + EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes(); EasyMock.replay(supervisor4); From 21ccbd09c6f04c4a6a4072f0a133cf6bb2630995 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 27 Mar 2024 08:54:14 -0500 Subject: [PATCH 5/6] comments --- docs/operations/metrics.md | 2 +- .../supervisor/SeekableStreamSupervisor.java | 29 ++++++++++--------- .../autoscaler/LagBasedAutoScaler.java | 28 +++++++++--------- .../SeekableStreamSupervisorSpecTest.java | 27 ++++++++--------- 4 files changed, 42 insertions(+), 44 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 396c085902a4..e49b9600181b 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -258,7 +258,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em |`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s | |`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds| |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.| -|`ingest/autoScaler/lagBased/requiredTasks`|Count of required tasks based on the calculations of `lagBased` auto scaler.|`dataSource`, `stream`, `skipReason`|Depends on auto scaler config.| +|`task/autoScaler/requiredCount`|Count of required tasks based on the calculations of `lagBased` auto scaler.|`dataSource`, `stream`, `scalingSkipReason`|Depends on auto scaler config.| If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 5129f03f3949..59cd05bf349d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -77,7 +77,6 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; -import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScaler; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.IAE; @@ -150,6 +149,8 @@ public abstract class SeekableStreamSupervisor 0) { - emitter.emit( - event - .setDimension(LagBasedAutoScaler.SKIP_REASON_DIMENSION, "MIN_TRIGGER_SCALE_ACTION_FREQUENCY") - .setMetric(LagBasedAutoScaler.REQUIRED_TASKS_METRIC, desriedTaskCount)); + if (desiredTaskCount > 0) { + emitter.emit(event.setDimension( + AUTOSCALER_SKIP_REASON_DIMENSION, + "minTriggerScaleActionFrequencyMillis not elapsed yet" + ) + .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); } return; } - if (desriedTaskCount > 0) { - emitter.emit(event.setMetric(LagBasedAutoScaler.REQUIRED_TASKS_METRIC, desriedTaskCount)); + if (desiredTaskCount > 0) { + emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); } - boolean allocationSuccess = changeTaskCount(desriedTaskCount); + boolean allocationSuccess = changeTaskCount(desiredTaskCount); if (allocationSuccess) { dynamicTriggerLastRunTime = nowTime; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index fc59c1bc9e01..8235f53e33fe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -40,8 +40,6 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler { - public static final String SKIP_REASON_DIMENSION = "skipReason"; - public static final String REQUIRED_TASKS_METRIC = "ingest/autoScaler/lagBased/requiredTasks"; private static final EmittingLogger log = new EmittingLogger(LagBasedAutoScaler.class); private final String dataSource; private final CircularFifoQueue lagMetricsQueue; @@ -51,7 +49,7 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler private final SeekableStreamSupervisor supervisor; private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig; private final ServiceEmitter emitter; - private final ServiceMetricEvent.Builder metricbuilder; + private final ServiceMetricEvent.Builder metricBuilder; private static final ReentrantLock LOCK = new ReentrantLock(true); @@ -74,7 +72,7 @@ public LagBasedAutoScaler( this.spec = spec; this.supervisor = supervisor; this.emitter = emitter; - metricbuilder = ServiceMetricEvent.builder() + metricBuilder = ServiceMetricEvent.builder() .setDimension(DruidMetrics.DATASOURCE, dataSource) .setDimension(DruidMetrics.STREAM, this.supervisor.getIoConfig().getStream()); } @@ -229,11 +227,12 @@ private int computeDesiredTaskCount(List lags) log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].", dataSource ); - emitter.emit( - metricbuilder - .setDimension(SKIP_REASON_DIMENSION, "AT_MAX") - .setMetric(REQUIRED_TASKS_METRIC, taskCount) - ); + emitter.emit(metricBuilder + .setDimension( + SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, + "Already at max task count" + ) + .setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount)); return -1; } else { desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax); @@ -248,11 +247,12 @@ private int computeDesiredTaskCount(List lags) log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].", dataSource ); - emitter.emit( - metricbuilder - .setDimension(SKIP_REASON_DIMENSION, "AT_MIN") - .setMetric(REQUIRED_TASKS_METRIC, taskCount) - ); + emitter.emit(metricBuilder + .setDimension( + SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, + "Already at min task count" + ) + .setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount)); return -1; } else { desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 287daa165398..d7a7b44c2b98 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -56,16 +56,15 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.easymock.Capture; -import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.joda.time.DateTime; @@ -132,6 +131,8 @@ public void setUp() monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class); supervisorStateManagerConfig = EasyMock.mock(SupervisorStateManagerConfig.class); supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class); + + EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(null).anyTimes(); } private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor @@ -757,11 +758,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); EasyMock.replay(taskMaster); - Capture metricCapture = Capture.newInstance(CaptureType.ALL); - ServiceEmitter dynamicActionEmitter = EasyMock.mock(ServiceEmitter.class); - dynamicActionEmitter.emit(EasyMock.capture(metricCapture)); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(dynamicActionEmitter); + StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter(); TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); @@ -784,13 +781,13 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); Assert.assertTrue( - metricCapture - .getValues() + dynamicActionEmitter + .getMetricEvents() + .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) .stream() - .map(metric -> metric.build(ImmutableMap.of())) - .map(ServiceMetricEvent::getMetric) - .anyMatch(LagBasedAutoScaler.REQUIRED_TASKS_METRIC::equals)); - EasyMock.verify(dynamicActionEmitter); + .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)) + .filter(Objects::nonNull) + .anyMatch("minTriggerScaleActionFrequencyMillis not elapsed yet"::equals)); autoScaler.reset(); autoScaler.stop(); } @@ -1208,7 +1205,7 @@ private static Map getScaleOutProperties(int maxTaskCount) { HashMap autoScalerConfig = new HashMap<>(); autoScalerConfig.put("enableTaskAutoScaler", true); - autoScalerConfig.put("lagCollectionIntervalMillis", 500); + autoScalerConfig.put("lagCollectionIntervalMillis", 50); autoScalerConfig.put("lagCollectionRangeMillis", 500); autoScalerConfig.put("scaleOutThreshold", 0); autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0); From 7bb15e71237ac4e630bba5cdc95e3f572aad0a9e Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 27 Mar 2024 09:18:45 -0500 Subject: [PATCH 6/6] Add couple more tests --- .../SeekableStreamSupervisorSpecTest.java | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index d7a7b44c2b98..3e0e46d7a033 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -792,6 +792,65 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc autoScaler.stop(); } + @Test + public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax() throws InterruptedException + { + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, true)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter(); + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10) + { + @Override + public int getActiveTaskGroupsCount() + { + return 2; + } + }; + + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + getScaleOutProperties(2), + LagBasedAutoScalerConfig.class + ), + spec, + dynamicActionEmitter + ); + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + Thread.sleep(1000); + + Assert.assertTrue( + dynamicActionEmitter + .getMetricEvents() + .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) + .stream() + .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)) + .filter(Objects::nonNull) + .anyMatch("Already at max task count"::equals)); + + autoScaler.reset(); + autoScaler.stop(); + } + @Test public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor() throws InterruptedException { @@ -934,6 +993,65 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce autoScaler.stop(); } + @Test + public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws InterruptedException + { + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter(); + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10) + { + @Override + public int getActiveTaskGroupsCount() + { + return 1; + } + }; + + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + getScaleInProperties(), + LagBasedAutoScalerConfig.class + ), + spec, + dynamicActionEmitter + ); + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + Thread.sleep(1000); + + Assert.assertTrue( + dynamicActionEmitter + .getMetricEvents() + .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) + .stream() + .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)) + .filter(Objects::nonNull) + .anyMatch("Already at min task count"::equals)); + + autoScaler.reset(); + autoScaler.stop(); + } + @Test public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException {