diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 7fbacf4c5872..7b17c46a2708 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -194,17 +194,73 @@ existing publishing tasks and will create new tasks starting at the offsets the Seamless schema migrations can thus be achieved by simply submitting the new schema using this endpoint. -#### Shutdown Supervisor +#### Suspend Supervisor + +``` +POST /druid/indexer/v1/supervisor//suspend +``` +Suspend indexing tasks associated with a supervisor. Note that the supervisor itself will still be +operating and emitting logs and metrics, it will just ensure that no indexing tasks are running until the supervisor +is resumed. Responds with updated SupervisorSpec. + +#### Resume Supervisor + +``` +POST /druid/indexer/v1/supervisor//resume +``` +Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec. + +#### Reset Supervisor +``` +POST /druid/indexer/v1/supervisor//reset +``` +The indexing service keeps track of the latest persisted Kafka offsets in order to provide exactly-once ingestion +guarantees across tasks. Subsequent tasks must start reading from where the previous task completed in order for the +generated segments to be accepted. If the messages at the expected starting offsets are no longer available in Kafka +(typically because the message retention period has elapsed or the topic was removed and re-created) the supervisor will +refuse to start and in-flight tasks will fail. + +This endpoint can be used to clear the stored offsets which will cause the supervisor to start reading from +either the earliest or latest offsets in Kafka (depending on the value of `useEarliestOffset`). The supervisor must be +running for this endpoint to be available. After the stored offsets are cleared, the supervisor will automatically kill +and re-create any active tasks so that tasks begin reading from valid offsets. + +Note that since the stored offsets are necessary to guarantee exactly-once ingestion, resetting them with this endpoint +may cause some Kafka messages to be skipped or to be read twice. + +#### Terminate Supervisor +``` +POST /druid/indexer/v1/supervisor//terminate +``` +Terminate a supervisor and cause all associated indexing tasks managed by this supervisor to immediately stop and begin +publishing their segments. This supervisor will still exist in the metadata store and it's history may be retrieved +with the supervisor history api, but will not be listed in the 'get supervisors' api response nor can it's configuration +or status report be retrieved. The only way this supervisor can start again is by submitting a functioning supervisor +spec to the create api. + +#### Shutdown Supervisor +_Deprecated: use the equivalent 'terminate' instead_ ``` POST /druid/indexer/v1/supervisor//shutdown ``` -Note that this will cause all indexing tasks managed by this supervisor to immediately stop and begin publishing their segments. #### Get Supervisor IDs ``` GET /druid/indexer/v1/supervisor ``` -Returns a list of the currently active supervisors. +Returns a list of strings of the currently active supervisor ids. + +#### Get Supervisors +``` +GET /druid/indexer/v1/supervisor?full +``` +Returns a list of objects of the currently active supervisors. + +|Field|Type|Description| +|---|---|---| +|`id`|String|supervisor unique identifier| +|`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)| + #### Get Supervisor Spec ``` @@ -233,24 +289,6 @@ GET /druid/indexer/v1/supervisor//history ``` Returns an audit history of specs for the supervisor with the provided ID. -#### Reset Supervisor -``` -POST /druid/indexer/v1/supervisor//reset -``` -The indexing service keeps track of the latest persisted Kafka offsets in order to provide exactly-once ingestion -guarantees across tasks. Subsequent tasks must start reading from where the previous task completed in order for the -generated segments to be accepted. If the messages at the expected starting offsets are no longer available in Kafka -(typically because the message retention period has elapsed or the topic was removed and re-created) the supervisor will -refuse to start and in-flight tasks will fail. - -This endpoint can be used to clear the stored offsets which will cause the supervisor to start reading from -either the earliest or latest offsets in Kafka (depending on the value of `useEarliestOffset`). The supervisor must be -running for this endpoint to be available. After the stored offsets are cleared, the supervisor will automatically kill -and re-create any active tasks so that tasks begin reading from valid offsets. - -Note that since the stored offsets are necessary to guarantee exactly-once ingestion, resetting them with this endpoint -may cause some Kafka messages to be skipped or to be read twice. - ## Capacity Planning Kafka indexing tasks run on middle managers and are thus limited by the resources available in the middle manager diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 56afe3efd5ef..d4e7f08d6576 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -135,31 +135,7 @@ public void start() exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(supervisorId)); final Duration delay = config.getTaskCheckDuration().toStandardDuration(); future = exec.scheduleWithFixedDelay( - new Runnable() { - @Override - public void run() - { - try { - DataSourceMetadata metadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource); - if (metadata instanceof DerivativeDataSourceMetadata - && spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata) metadata).getBaseDataSource()) - && spec.getDimensions().equals(((DerivativeDataSourceMetadata) metadata).getDimensions()) - && spec.getMetrics().equals(((DerivativeDataSourceMetadata) metadata).getMetrics())) { - checkSegmentsAndSubmitTasks(); - } else { - log.error( - "Failed to start %s. Metadata in database(%s) is different from new dataSource metadata(%s)", - supervisorId, - metadata, - spec - ); - } - } - catch (Exception e) { - log.makeAlert(e, StringUtils.format("uncaught exception in %s.", supervisorId)).emit(); - } - } - }, + MaterializedViewSupervisor.this::run, 0, delay.getMillis(), TimeUnit.MILLISECONDS @@ -167,7 +143,40 @@ public void run() started = true; } } - + + @VisibleForTesting + public void run() + { + try { + if (spec.isSuspended()) { + log.info( + "Materialized view supervisor[%s:%s] is suspended", + spec.getId(), + spec.getDataSourceName() + ); + return; + } + + DataSourceMetadata metadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource); + if (metadata instanceof DerivativeDataSourceMetadata + && spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata) metadata).getBaseDataSource()) + && spec.getDimensions().equals(((DerivativeDataSourceMetadata) metadata).getDimensions()) + && spec.getMetrics().equals(((DerivativeDataSourceMetadata) metadata).getMetrics())) { + checkSegmentsAndSubmitTasks(); + } else { + log.error( + "Failed to start %s. Metadata in database(%s) is different from new dataSource metadata(%s)", + supervisorId, + metadata, + spec + ); + } + } + catch (Exception e) { + log.makeAlert(e, StringUtils.format("uncaught exception in %s.", supervisorId)).emit(); + } + } + @Override public void stop(boolean stopGracefully) { @@ -207,7 +216,8 @@ public SupervisorReport getStatus() { return new MaterializedViewSupervisorReport( dataSource, - DateTimes.nowUtc(), + DateTimes.nowUtc(), + spec.isSuspended(), spec.getBaseDataSource(), spec.getDimensions(), spec.getMetrics(), diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java index 107354fe08f1..f05f8e0f6d1e 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java @@ -29,10 +29,11 @@ public class MaterializedViewSupervisorReport extends SupervisorReport { - + public MaterializedViewSupervisorReport( String dataSource, DateTime generationTime, + boolean suspended, String baseDataSource, Set dimensions, Set metrics, @@ -42,6 +43,7 @@ public MaterializedViewSupervisorReport( super(dataSource, generationTime, "{" + "dataSource='" + dataSource + '\'' + ", baseDataSource='" + baseDataSource + '\'' + + ", suspended='" + suspended + "\'" + ", dimensions=" + dimensions + ", metrics=" + metrics + ", missTimeline" + Sets.newHashSet(missTimeline) + diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index b81d85e297ac..29904a476357 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -81,6 +81,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec private final MaterializedViewTaskConfig config; private final AuthorizerMapper authorizerMapper; private final ChatHandlerProvider chatHandlerProvider; + private final boolean suspended; public MaterializedViewSupervisorSpec( @JsonProperty("baseDataSource") String baseDataSource, @@ -92,6 +93,7 @@ public MaterializedViewSupervisorSpec( @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates, @JsonProperty("classpathPrefix") String classpathPrefix, @JsonProperty("context") Map context, + @JsonProperty("suspended") Boolean suspended, @JacksonInject ObjectMapper objectMapper, @JacksonInject TaskMaster taskMaster, @JacksonInject TaskStorage taskStorage, @@ -139,7 +141,8 @@ public MaterializedViewSupervisorSpec( this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.config = config; - + this.suspended = suspended != null ? suspended : false; + this.metrics = Sets.newHashSet(); for (AggregatorFactory aggregatorFactory : aggregators) { metrics.add(aggregatorFactory.getName()); @@ -305,7 +308,14 @@ public Map getContext() { return context; } - + + @Override + @JsonProperty("suspended") + public boolean isSuspended() + { + return suspended; + } + @Override public String getId() { @@ -331,7 +341,59 @@ public List getDataSources() { return ImmutableList.of(dataSourceName); } - + + @Override + public SupervisorSpec createSuspendedSpec() + { + return new MaterializedViewSupervisorSpec( + baseDataSource, + dimensionsSpec, + aggregators, + tuningConfig, + dataSourceName, + hadoopCoordinates, + hadoopDependencyCoordinates, + classpathPrefix, + context, + true, + objectMapper, + taskMaster, + taskStorage, + metadataSupervisorManager, + segmentManager, + metadataStorageCoordinator, + config, + authorizerMapper, + chatHandlerProvider + ); + } + + @Override + public SupervisorSpec createRunningSpec() + { + return new MaterializedViewSupervisorSpec( + baseDataSource, + dimensionsSpec, + aggregators, + tuningConfig, + dataSourceName, + hadoopCoordinates, + hadoopDependencyCoordinates, + classpathPrefix, + context, + false, + objectMapper, + taskMaster, + taskStorage, + metadataSupervisorManager, + segmentManager, + metadataStorageCoordinator, + config, + authorizerMapper, + chatHandlerProvider + ); + } + @Override public String toString() { diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index b9c816083f80..40da151f1d47 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -75,6 +75,7 @@ public void setup() .addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider()) ); } + @Test public void testSupervisorSerialization() throws IOException { @@ -132,6 +133,7 @@ public void testSupervisorSerialization() throws IOException null, null, null, + false, objectMapper, null, null, @@ -150,6 +152,51 @@ public void testSupervisorSerialization() throws IOException Assert.assertEquals(expected.getMetrics(), spec.getMetrics()); } + @Test + public void testSuspendResuume() throws IOException + { + String supervisorStr = "{\n" + + " \"type\" : \"derivativeDataSource\",\n" + + " \"baseDataSource\": \"wikiticker\",\n" + + " \"dimensionsSpec\":{\n" + + " \"dimensions\" : [\n" + + " \"isUnpatrolled\",\n" + + " \"metroCode\",\n" + + " \"namespace\",\n" + + " \"page\",\n" + + " \"regionIsoCode\",\n" + + " \"regionName\",\n" + + " \"user\"\n" + + " ]\n" + + " },\n" + + " \"metricsSpec\" : [\n" + + " {\n" + + " \"name\" : \"count\",\n" + + " \"type\" : \"count\"\n" + + " },\n" + + " {\n" + + " \"name\" : \"added\",\n" + + " \"type\" : \"longSum\",\n" + + " \"fieldName\" : \"added\"\n" + + " }\n" + + " ],\n" + + " \"tuningConfig\": {\n" + + " \"type\" : \"hadoop\"\n" + + " }\n" + + "}"; + + MaterializedViewSupervisorSpec spec = objectMapper.readValue(supervisorStr, MaterializedViewSupervisorSpec.class); + Assert.assertFalse(spec.isSuspended()); + + String suspendedSerialized = objectMapper.writeValueAsString(spec.createSuspendedSpec()); + MaterializedViewSupervisorSpec suspendedSpec = objectMapper.readValue(suspendedSerialized, MaterializedViewSupervisorSpec.class); + Assert.assertTrue(suspendedSpec.isSuspended()); + + String runningSerialized = objectMapper.writeValueAsString(spec.createRunningSpec()); + MaterializedViewSupervisorSpec runningSpec = objectMapper.readValue(runningSerialized, MaterializedViewSupervisorSpec.class); + Assert.assertFalse(runningSpec.isSuspended()); + } + @Test public void testEmptyBaseDataSource() throws Exception { @@ -182,6 +229,7 @@ public void testEmptyBaseDataSource() throws Exception null, null, null, + false, objectMapper, null, null, @@ -226,6 +274,7 @@ public void testNullBaseDataSource() throws Exception null, null, null, + false, objectMapper, null, null, diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index a48f76fa9a4f..4ece1f0c9263 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -47,6 +47,9 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import static org.easymock.EasyMock.expect; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -109,6 +112,7 @@ public void setUp() throws IOException null, null, null, + false, objectMapper, taskMaster, taskStorage, @@ -121,9 +125,9 @@ public void setUp() throws IOException ); supervisor = (MaterializedViewSupervisor) spec.createSupervisor(); } - + @Test - public void testCheckSegments() throws IOException + public void testCheckSegments() throws IOException { Set baseSegments = Sets.newHashSet( new DataSegment( @@ -156,7 +160,7 @@ public void testCheckSegments() throws IOException Pair, Map>> toBuildInterval = supervisor.checkSegments(); Map> expectedSegments = Maps.newHashMap(); expectedSegments.put( - Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), + Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), Collections.singletonList( new DataSegment( "base", @@ -173,4 +177,43 @@ public void testCheckSegments() throws IOException ); Assert.assertEquals(expectedSegments, toBuildInterval.rhs); } + + + @Test + public void testSuspendedDoesntRun() throws IOException + { + MaterializedViewSupervisorSpec suspended = new MaterializedViewSupervisorSpec( + "base", + new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim")), null, null), + new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")}, + HadoopTuningConfig.makeDefaultTuningConfig(), + null, + null, + null, + null, + null, + true, + objectMapper, + taskMaster, + taskStorage, + metadataSupervisorManager, + sqlMetadataSegmentManager, + indexerMetadataStorageCoordinator, + new MaterializedViewTaskConfig(), + createMock(AuthorizerMapper.class), + createMock(ChatHandlerProvider.class) + ); + MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) suspended.createSupervisor(); + + // mock IndexerSQLMetadataStorageCoordinator to ensure that getDataSourceMetadata is not called + // which will be true if truly suspended, since this is the first operation of the 'run' method otherwise + IndexerSQLMetadataStorageCoordinator mock = createMock(IndexerSQLMetadataStorageCoordinator.class); + expect(mock.getDataSourceMetadata(suspended.getDataSourceName())).andAnswer((IAnswer) () -> { + Assert.fail(); + return null; + }).anyTimes(); + + EasyMock.replay(mock); + supervisor.run(); + } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 78652a70b919..5087eef33c25 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -309,14 +309,8 @@ public TaskLocation getTaskLocation(final String id) Optional taskRunner = taskMaster.getTaskRunner(); if (taskRunner.isPresent()) { Optional item = Iterables.tryFind( - taskRunner.get().getRunningTasks(), new Predicate() - { - @Override - public boolean apply(TaskRunnerWorkItem taskRunnerWorkItem) - { - return id.equals(taskRunnerWorkItem.getTaskId()); - } - } + taskRunner.get().getRunningTasks(), + (Predicate) taskRunnerWorkItem -> id.equals(taskRunnerWorkItem.getTaskId()) ); if (item.isPresent()) { @@ -372,29 +366,24 @@ public void start() consumer = getKafkaConsumer(); exec.submit( - new Runnable() - { - @Override - public void run() - { - try { - while (!Thread.currentThread().isInterrupted()) { - final Notice notice = notices.take(); - - try { - notice.handle(); - } - catch (Throwable e) { - log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", dataSource) - .addData("noticeClass", notice.getClass().getSimpleName()) - .emit(); - } + () -> { + try { + while (!Thread.currentThread().isInterrupted()) { + final Notice notice = notices.take(); + + try { + notice.handle(); + } + catch (Throwable e) { + log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", dataSource) + .addData("noticeClass", notice.getClass().getSimpleName()) + .emit(); } - } - catch (InterruptedException e) { - log.info("KafkaSupervisor[%s] interrupted, exiting", dataSource); } } + catch (InterruptedException e) { + log.info("KafkaSupervisor[%s] interrupted, exiting", dataSource); + } } ); firstRunTime = DateTimes.nowUtc().plus(ioConfig.getStartDelay()); @@ -898,7 +887,16 @@ void runInternal() throws ExecutionException, InterruptedException, TimeoutExcep checkTaskDuration(); checkPendingCompletionTasks(); checkCurrentTaskState(); - createNewTasks(); + + // if supervisor is not suspended, ensure required tasks are running + // if suspended, ensure tasks have been requested to gracefully stop + if (!spec.isSuspended()) { + log.info("[%s] supervisor is running.", dataSource); + createNewTasks(); + } else { + log.info("[%s] supervisor is suspended.", dataSource); + gracefulShutdownInternal(); + } if (log.isDebugEnabled()) { log.debug(generateReport(true).toString()); @@ -2096,7 +2094,8 @@ private SupervisorReport generateReport(boolean in includeOffsets ? latestOffsetsFromKafka : null, includeOffsets ? partitionLag : null, includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null, - includeOffsets ? offsetsLastUpdated : null + includeOffsets ? offsetsLastUpdated : null, + spec.isSuspended() ); SupervisorReport report = new SupervisorReport<>( dataSource, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java index 2a5a4829b5ac..d9533a37fb24 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java @@ -42,6 +42,7 @@ public class KafkaSupervisorReportPayload private final Map minimumLag; private final Long aggregateLag; private final DateTime offsetsLastUpdated; + private final boolean suspended; public KafkaSupervisorReportPayload( String dataSource, @@ -52,7 +53,8 @@ public KafkaSupervisorReportPayload( @Nullable Map latestOffsets, @Nullable Map minimumLag, @Nullable Long aggregateLag, - @Nullable DateTime offsetsLastUpdated + @Nullable DateTime offsetsLastUpdated, + boolean suspended ) { this.dataSource = dataSource; @@ -66,6 +68,7 @@ public KafkaSupervisorReportPayload( this.minimumLag = minimumLag; this.aggregateLag = aggregateLag; this.offsetsLastUpdated = offsetsLastUpdated; + this.suspended = suspended; } public void addTask(TaskReportData data) @@ -148,6 +151,12 @@ public DateTime getOffsetsLastUpdated() return offsetsLastUpdated; } + @JsonProperty + public boolean getSuspended() + { + return suspended; + } + @Override public String toString() { @@ -163,6 +172,7 @@ public String toString() (minimumLag != null ? ", minimumLag=" + minimumLag : "") + (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") + (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + offsetsLastUpdated : "") + + ", suspended=" + suspended + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index fc620ea61acb..28dab02c56e2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -55,6 +55,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec private final ServiceEmitter emitter; private final DruidMonitorSchedulerConfig monitorSchedulerConfig; private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final boolean suspended; @JsonCreator public KafkaSupervisorSpec( @@ -62,6 +63,7 @@ public KafkaSupervisorSpec( @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig, @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, @JsonProperty("context") Map context, + @JsonProperty("suspended") Boolean suspended, @JacksonInject TaskStorage taskStorage, @JacksonInject TaskMaster taskMaster, @JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, @@ -111,6 +113,7 @@ public KafkaSupervisorSpec( this.emitter = emitter; this.monitorSchedulerConfig = monitorSchedulerConfig; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.suspended = suspended != null ? suspended : false; } @JsonProperty @@ -137,6 +140,13 @@ public Map getContext() return context; } + @Override + @JsonProperty("suspended") + public boolean isSuspended() + { + return suspended; + } + public ServiceEmitter getEmitter() { return emitter; @@ -182,4 +192,35 @@ public String toString() ", ioConfig=" + ioConfig + '}'; } + + @Override + public KafkaSupervisorSpec createSuspendedSpec() + { + return toggleSuspend(true); + } + + @Override + public KafkaSupervisorSpec createRunningSpec() + { + return toggleSuspend(false); + } + + private KafkaSupervisorSpec toggleSuspend(boolean suspend) + { + return new KafkaSupervisorSpec( + dataSchema, + tuningConfig, + ioConfig, + context, + suspend, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + kafkaIndexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory + ); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java new file mode 100644 index 000000000000..b4c6dfd29972 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka.supervisor; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; +import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; +import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class KafkaSupervisorSpecTest +{ + private final ObjectMapper mapper; + + public KafkaSupervisorSpecTest() + { + mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(TaskStorage.class, null) + .addValue(TaskMaster.class, null) + .addValue(IndexerMetadataStorageCoordinator.class, null) + .addValue(KafkaIndexTaskClientFactory.class, null) + .addValue(ObjectMapper.class, mapper) + .addValue(ServiceEmitter.class, new NoopServiceEmitter()) + .addValue(DruidMonitorSchedulerConfig.class, null) + .addValue(RowIngestionMetersFactory.class, null) + .addValue(ExprMacroTable.class.getName(), LookupEnabledTestExprMacroTable.INSTANCE) + ); + mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + } + + @Test + public void testSerde() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); + Assert.assertNotNull(spec.getIoConfig()); + Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + Assert.assertFalse(spec.isSuspended()); + String serialized = mapper.writeValueAsString(spec); + + // expect default values populated in reserialized string + Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); + Assert.assertTrue(serialized.contains("\"indexSpec\":{")); + Assert.assertTrue(serialized.contains("\"suspended\":false")); + + KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); + + String stable = mapper.writeValueAsString(spec2); + + Assert.assertEquals(serialized, stable); + } + + @Test + public void testSuspendResume() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); + Assert.assertNotNull(spec.getIoConfig()); + Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + Assert.assertFalse(spec.isSuspended()); + + String suspendedSerialized = mapper.writeValueAsString(spec.createSuspendedSpec()); + + // expect default values populated in reserialized string + Assert.assertTrue(suspendedSerialized.contains("\"tuningConfig\":{")); + Assert.assertTrue(suspendedSerialized.contains("\"indexSpec\":{")); + Assert.assertTrue(suspendedSerialized.contains("\"suspended\":true")); + + KafkaSupervisorSpec suspendedSpec = mapper.readValue(suspendedSerialized, KafkaSupervisorSpec.class); + + Assert.assertTrue(suspendedSpec.isSuspended()); + + String runningSerialized = mapper.writeValueAsString(spec.createRunningSpec()); + + KafkaSupervisorSpec runningSpec = mapper.readValue(runningSerialized, KafkaSupervisorSpec.class); + + Assert.assertFalse(runningSpec.isSuspended()); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 964f5f7cac6f..978de2f6886f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -79,6 +79,7 @@ import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; +import org.easymock.IAnswer; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; @@ -2293,6 +2294,149 @@ public void testCheckpointWithNullTaskGroupId() verifyAll(); } + @Test + public void testSuspendedNoRunningTasks() throws Exception + { + supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true); + addSomeEvents(1); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + // this asserts that taskQueue.add does not in fact get called because supervisor should be suspended + expect(taskQueue.add(anyObject())).andAnswer((IAnswer) () -> { + Assert.fail(); + return null; + }).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testSuspendedRunningTasks() throws Exception + { + // graceful shutdown is expected to be called on running tasks since state is suspended + + final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); + final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final DateTime startTime = DateTimes.nowUtc(); + + supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true); + addSomeEvents(1); + + Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + Collection workItems = new ArrayList<>(); + workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); + workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING)); + expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + + // getCheckpoints will not be called for id1 as it is in publishing state + TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + + expect(taskClient.pauseAsync("id2")) + .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 15L, 1, 25L, 2, 30L))); + expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true)) + .andReturn(Futures.immediateFuture(true)); + taskQueue.shutdown("id3"); + expectLastCall().times(2); + + replayAll(); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testResetSuspended() throws Exception + { + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + reset(indexerMetadataStorageCoordinator); + expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); + replay(indexerMetadataStorageCoordinator); + + supervisor.resetInternal(null); + verifyAll(); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { @@ -2320,6 +2464,29 @@ private KafkaSupervisor getSupervisor( Period earlyMessageRejectionPeriod, boolean skipOffsetGaps ) + { + return getSupervisor( + replicas, + taskCount, + useEarliestOffset, + duration, + lateMessageRejectionPeriod, + earlyMessageRejectionPeriod, + skipOffsetGaps, + false + ); + } + + private KafkaSupervisor getSupervisor( + int replicas, + int taskCount, + boolean useEarliestOffset, + String duration, + Period lateMessageRejectionPeriod, + Period earlyMessageRejectionPeriod, + boolean skipOffsetGaps, + boolean suspended + ) { KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, @@ -2368,6 +2535,7 @@ public KafkaIndexTaskClient build( tuningConfig, kafkaSupervisorIOConfig, null, + suspended, taskStorage, taskMaster, indexerMetadataStorageCoordinator, @@ -2476,7 +2644,7 @@ public String getTaskType() } @Override - public String getDataSource() + public String getDataSource() { return dataSource; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index b5c24ef913fe..cca6fe51bd84 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -91,6 +91,19 @@ public boolean stopAndRemoveSupervisor(String id) } } + public boolean suspendOrResumeSupervisor(String id, boolean suspend) + { + Preconditions.checkState(started, "SupervisorManager not started"); + Pair pair = supervisors.get(id); + Preconditions.checkNotNull(pair.lhs, "spec"); + synchronized (lock) { + Preconditions.checkState(started, "SupervisorManager not started"); + SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec(); + possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false); + return createAndStartSupervisorInternal(nextState, true); + } + } + @LifecycleStart public void start() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 9be9c9fb3b57..d3e19bbb4aba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -44,6 +44,7 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Endpoints for submitting and starting a {@link SupervisorSpec}, getting running supervisors, stopping supervisors, @@ -90,49 +92,55 @@ public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapp public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req) { return asLeaderWithSupervisorManager( - new Function() - { - @Override - public Response apply(SupervisorManager manager) - { - Preconditions.checkArgument( - spec.getDataSources() != null && spec.getDataSources().size() > 0, - "No dataSources found to perform authorization checks" - ); - - Access authResult = AuthorizationUtils.authorizeAllResourceActions( - req, - Iterables.transform(spec.getDataSources(), AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR), - authorizerMapper - ); + manager -> { + Preconditions.checkArgument( + spec.getDataSources() != null && spec.getDataSources().size() > 0, + "No dataSources found to perform authorization checks" + ); - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.toString()); - } + Access authResult = AuthorizationUtils.authorizeAllResourceActions( + req, + Iterables.transform(spec.getDataSources(), AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR), + authorizerMapper + ); - manager.createOrUpdateAndStartSupervisor(spec); - return Response.ok(ImmutableMap.of("id", spec.getId())).build(); + if (!authResult.isAllowed()) { + throw new ForbiddenException(authResult.toString()); } + + manager.createOrUpdateAndStartSupervisor(spec); + return Response.ok(ImmutableMap.of("id", spec.getId())).build(); } ); } @GET @Produces(MediaType.APPLICATION_JSON) - public Response specGetAll(@Context final HttpServletRequest req) + public Response specGetAll( + @QueryParam("full") String full, + @Context final HttpServletRequest req + ) { return asLeaderWithSupervisorManager( - new Function() - { - @Override - public Response apply(final SupervisorManager manager) - { - Set authorizedSupervisorIds = filterAuthorizedSupervisorIds( - req, - manager, - manager.getSupervisorIds() - ); + manager -> { + Set authorizedSupervisorIds = filterAuthorizedSupervisorIds( + req, + manager, + manager.getSupervisorIds() + ); + + if (full == null) { return Response.ok(authorizedSupervisorIds).build(); + } else { + List> all = + authorizedSupervisorIds.stream() + .map(x -> ImmutableMap.builder() + .put("id", x) + .put("spec", manager.getSupervisorSpec(x).get()) + .build() + ) + .collect(Collectors.toList()); + return Response.ok(all).build(); } } ); @@ -145,20 +153,15 @@ public Response apply(final SupervisorManager manager) public Response specGet(@PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function() - { - @Override - public Response apply(SupervisorManager manager) - { - Optional spec = manager.getSupervisorSpec(id); - if (!spec.isPresent()) { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } - - return Response.ok(spec.get()).build(); + manager -> { + Optional spec = manager.getSupervisorSpec(id); + if (!spec.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); } + + return Response.ok(spec.get()).build(); } ); } @@ -170,20 +173,15 @@ public Response apply(SupervisorManager manager) public Response specGetStatus(@PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function() - { - @Override - public Response apply(SupervisorManager manager) - { - Optional spec = manager.getSupervisorStatus(id); - if (!spec.isPresent()) { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } - - return Response.ok(spec.get()).build(); + manager -> { + Optional spec = manager.getSupervisorStatus(id); + if (!spec.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); } + + return Response.ok(spec.get()).build(); } ); } @@ -197,49 +195,66 @@ public Response getAllTaskStats( ) { return asLeaderWithSupervisorManager( - new Function() - { - @Override - public Response apply(SupervisorManager manager) - { - Optional>> stats = manager.getSupervisorStats(id); - if (!stats.isPresent()) { - return Response.status(Response.Status.NOT_FOUND) - .entity( - ImmutableMap.of( - "error", - StringUtils.format("[%s] does not exist", id) - ) - ) - .build(); - } - - return Response.ok(stats.get()).build(); + manager -> { + Optional>> stats = manager.getSupervisorStats(id); + if (!stats.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity( + ImmutableMap.of( + "error", + StringUtils.format("[%s] does not exist", id) + ) + ) + .build(); } + + return Response.ok(stats.get()).build(); } ); } + @POST + @Path("/{id}/resume") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response specResume(@PathParam("id") final String id) + { + return specSuspendOrResume(id, false); + } + + @POST + @Path("/{id}/suspend") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response specSuspend(@PathParam("id") final String id) + { + return specSuspendOrResume(id, true); + } + @Deprecated @POST @Path("/{id}/shutdown") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(SupervisorResourceFilter.class) public Response shutdown(@PathParam("id") final String id) + { + return terminate(id); + } + + @POST + @Path("/{id}/terminate") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response terminate(@PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function() - { - @Override - public Response apply(SupervisorManager manager) - { - if (manager.stopAndRemoveSupervisor(id)) { - return Response.ok(ImmutableMap.of("id", id)).build(); - } else { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } + manager -> { + if (manager.stopAndRemoveSupervisor(id)) { + return Response.ok(ImmutableMap.of("id", id)).build(); + } else { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); } } ); @@ -251,21 +266,14 @@ public Response apply(SupervisorManager manager) public Response specGetAllHistory(@Context final HttpServletRequest req) { return asLeaderWithSupervisorManager( - new Function() - { - @Override - public Response apply(final SupervisorManager manager) - { - return Response.ok( - AuthorizationUtils.filterAuthorizedResources( - req, - manager.getSupervisorHistory(), - SPEC_DATASOURCE_READ_RA_GENERATOR, - authorizerMapper - ) - ).build(); - } - } + manager -> Response.ok( + AuthorizationUtils.filterAuthorizedResources( + req, + manager.getSupervisorHistory(), + SPEC_DATASOURCE_READ_RA_GENERATOR, + authorizerMapper + ) + ).build() ); } @@ -277,38 +285,33 @@ public Response specGetHistory( @PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function() - { - @Override - public Response apply(SupervisorManager manager) - { - Map> supervisorHistory = manager.getSupervisorHistory(); - Iterable historyForId = supervisorHistory.get(id); - if (historyForId != null) { - final List authorizedHistoryForId = - Lists.newArrayList( - AuthorizationUtils.filterAuthorizedResources( - req, - historyForId, - SPEC_DATASOURCE_READ_RA_GENERATOR, - authorizerMapper - ) - ); - if (authorizedHistoryForId.size() > 0) { - return Response.ok(authorizedHistoryForId).build(); - } + manager -> { + Map> supervisorHistory = manager.getSupervisorHistory(); + Iterable historyForId = supervisorHistory.get(id); + if (historyForId != null) { + final List authorizedHistoryForId = + Lists.newArrayList( + AuthorizationUtils.filterAuthorizedResources( + req, + historyForId, + SPEC_DATASOURCE_READ_RA_GENERATOR, + authorizerMapper + ) + ); + if (authorizedHistoryForId.size() > 0) { + return Response.ok(authorizedHistoryForId).build(); } + } - return Response.status(Response.Status.NOT_FOUND) - .entity( - ImmutableMap.of( - "error", - StringUtils.format("No history for [%s].", id) - ) - ) - .build(); + return Response.status(Response.Status.NOT_FOUND) + .entity( + ImmutableMap.of( + "error", + StringUtils.format("No history for [%s].", id) + ) + ) + .build(); - } } ); } @@ -320,18 +323,13 @@ public Response apply(SupervisorManager manager) public Response reset(@PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function() - { - @Override - public Response apply(SupervisorManager manager) - { - if (manager.resetSupervisor(id, null)) { - return Response.ok(ImmutableMap.of("id", id)).build(); - } else { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } + manager -> { + if (manager.resetSupervisor(id, null)) { + return Response.ok(ImmutableMap.of("id", id)).build(); + } else { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); } } ); @@ -375,4 +373,29 @@ private Set filterAuthorizedSupervisorIds( ) ); } + + private Response specSuspendOrResume(final String id, boolean suspend) + { + return asLeaderWithSupervisorManager( + manager -> { + Optional spec = manager.getSupervisorSpec(id); + if (!spec.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); + } + + if (spec.get().isSuspended() == suspend) { + final String errMsg = + StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running"); + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", errMsg)) + .build(); + } + manager.suspendOrResumeSupervisor(id, suspend); + spec = manager.getSupervisorSpec(id); + return Response.ok(spec.get()).build(); + } + ); + } } diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index 5ec87c588fa4..a3bf8e6a3f20 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -6,7 +6,7 @@ var killTask = function(taskId) { if(confirm('Do you really want to kill: '+taskId)) { $.ajax({ type:'POST', - url: '/druid/indexer/v1/task/'+ taskId +'/shutdown', + url: '/druid/indexer/v1/task/'+ taskId +'/terminate', data: '' }).done(function(data) { setTimeout(function() { location.reload(true) }, 750); @@ -16,6 +16,42 @@ var killTask = function(taskId) { } } + +var suspendSupervisor = function(supervisorId) { + if(confirm('Do you really want to suspend: '+ supervisorId)) { + $.ajax({ + type:'POST', + url: '/druid/indexer/v1/supervisor/' + supervisorId + '/suspend', + data: '' + }).done(function(data) { + setTimeout(function() { location.reload(true) }, 750); + }).fail(function(data) { + var errMsg = data && data.responseJSON && data.responseJSON.error ? + data.responseJSON.error : + 'suspend request failed, please check overlord logs for details.'; + alert(errMsg); + }) + } +} + + +var resumeSupervisor = function(supervisorId) { + if(confirm('Do you really want to resume: '+ supervisorId)) { + $.ajax({ + type:'POST', + url: '/druid/indexer/v1/supervisor/' + supervisorId + '/resume', + data: '' + }).done(function(data) { + setTimeout(function() { location.reload(true) }, 750); + }).fail(function(data) { + var errMsg = data && data.responseJSON && data.responseJSON.error ? + data.responseJSON.error : + 'resume request failed, please check overlord logs for details.'; + alert(errMsg); + }) + } +} + var resetSupervisor = function(supervisorId) { if(confirm('Do you really want to reset: '+ supervisorId)) { $.ajax({ @@ -31,7 +67,7 @@ var resetSupervisor = function(supervisorId) { } var shutdownSupervisor = function(supervisorId) { - if(confirm('Do you really want to shutdown: '+ supervisorId)) { + if(confirm('Do you really want to terminate: '+ supervisorId)) { $.ajax({ type:'POST', url: '/druid/indexer/v1/supervisor/' + supervisorId + '/shutdown', @@ -39,7 +75,7 @@ var shutdownSupervisor = function(supervisorId) { }).done(function(data) { setTimeout(function() { location.reload(true) }, 750); }).fail(function(data) { - alert('Shutdown request failed, please check overlord logs for details.'); + alert('Terminate request failed, please check overlord logs for details.'); }) } } @@ -59,18 +95,28 @@ $(document).ready(function() { } } - $.get('/druid/indexer/v1/supervisor', function(dataList) { + $.get('/druid/indexer/v1/supervisor?full', function(dataList) { + var data = [] for (i = 0 ; i < dataList.length ; i++) { - var supervisorId = encodeURIComponent(dataList[i]) + var supervisorId = encodeURIComponent(dataList[i].id) + var supervisorSpec = dataList[i].spec; + var statusText = supervisorSpec && supervisorSpec.suspended ? + 'suspended' : + 'running'; data[i] = { "dataSource" : supervisorId, "more" : 'payload' + 'status' + 'history' + + (supervisorSpec.suspended ? + 'resume' : + 'suspend' + ) + 'reset' + - 'shutdown' + 'terminate', + "status": statusText } } buildTable((data), $('#supervisorsTable')); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java index abdd60a9dcde..0a977093f50c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java @@ -131,6 +131,24 @@ public List getDataSources() { return ImmutableList.of("test"); } + + @Override + public SupervisorSpec createSuspendedSpec() + { + return null; + } + + @Override + public SupervisorSpec createRunningSpec() + { + return null; + } + + @Override + public boolean isSuspended() + { + return false; + } }; EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString())) .andReturn(Optional.of(supervisorSpec)) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index f91dc1adfcb8..85cfd95c726e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -24,6 +24,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -40,6 +41,7 @@ import java.util.Map; import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.eq; @RunWith(EasyMockRunner.class) @@ -263,15 +265,114 @@ public void testResetSupervisor() verifyAll(); } + @Test + public void testCreateSuspendResumeAndStopSupervisor() + { + Capture capturedInsert = Capture.newInstance(); + SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1, false, supervisor2); + Map existingSpecs = ImmutableMap.of( + "id3", new TestSupervisorSpec("id3", supervisor3) + ); + + // mock adding a supervisor to manager with existing supervisor then suspending it + Assert.assertTrue(manager.getSupervisorIds().isEmpty()); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + metadataSupervisorManager.insert("id1", spec); + supervisor3.start(); + supervisor1.start(); + replayAll(); + + manager.start(); + Assert.assertEquals(1, manager.getSupervisorIds().size()); + + manager.createOrUpdateAndStartSupervisor(spec); + Assert.assertEquals(2, manager.getSupervisorIds().size()); + Assert.assertEquals(spec, manager.getSupervisorSpec("id1").get()); + verifyAll(); + + // mock suspend, which stops supervisor1 and sets suspended state in metadata, flipping to supervisor2 + // in TestSupervisorSpec implementation of createSuspendedSpec + resetAll(); + metadataSupervisorManager.insert(eq("id1"), capture(capturedInsert)); + supervisor2.start(); + supervisor1.stop(true); + replayAll(); + + manager.suspendOrResumeSupervisor("id1", true); + Assert.assertEquals(2, manager.getSupervisorIds().size()); + Assert.assertEquals(capturedInsert.getValue(), manager.getSupervisorSpec("id1").get()); + Assert.assertTrue(capturedInsert.getValue().suspended); + verifyAll(); + + // mock resume, which stops supervisor2 and sets suspended to false in metadata, flipping to supervisor1 + // in TestSupervisorSpec implementation of createRunningSpec + resetAll(); + metadataSupervisorManager.insert(eq("id1"), capture(capturedInsert)); + supervisor2.stop(true); + supervisor1.start(); + replayAll(); + + manager.suspendOrResumeSupervisor("id1", false); + Assert.assertEquals(2, manager.getSupervisorIds().size()); + Assert.assertEquals(capturedInsert.getValue(), manager.getSupervisorSpec("id1").get()); + Assert.assertFalse(capturedInsert.getValue().suspended); + verifyAll(); + + // mock stop of suspended then resumed supervisor + resetAll(); + metadataSupervisorManager.insert(eq("id1"), anyObject(NoopSupervisorSpec.class)); + supervisor1.stop(true); + replayAll(); + + boolean retVal = manager.stopAndRemoveSupervisor("id1"); + Assert.assertTrue(retVal); + Assert.assertEquals(1, manager.getSupervisorIds().size()); + Assert.assertEquals(Optional.absent(), manager.getSupervisorSpec("id1")); + verifyAll(); + + // mock manager shutdown to ensure supervisor 3 stops + resetAll(); + supervisor3.stop(false); + replayAll(); + + manager.stop(); + verifyAll(); + + Assert.assertTrue(manager.getSupervisorIds().isEmpty()); + } + + private static class TestSupervisorSpec implements SupervisorSpec { private final String id; private final Supervisor supervisor; + private final boolean suspended; + private final Supervisor suspendedSupervisor; + public TestSupervisorSpec(String id, Supervisor supervisor) + { + this(id, supervisor, false, null); + } + + public TestSupervisorSpec(String id, Supervisor supervisor, boolean suspended, Supervisor suspendedSupervisor) { this.id = id; this.supervisor = supervisor; + this.suspended = suspended; + this.suspendedSupervisor = suspendedSupervisor; + } + @Override + public SupervisorSpec createSuspendedSpec() + { + return new TestSupervisorSpec(id, suspendedSupervisor, true, supervisor); + } + + @Override + public SupervisorSpec createRunningSpec() + { + return new TestSupervisorSpec(id, suspendedSupervisor, false, supervisor); } @Override @@ -286,11 +387,16 @@ public Supervisor createSupervisor() return supervisor; } + @Override + public boolean isSuspended() + { + return suspended; + } + @Override public List getDataSources() { return new ArrayList<>(); } - } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index a3434bc45181..9d6eab33e1e7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -29,12 +29,10 @@ import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.server.security.Access; -import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; -import org.apache.druid.server.security.Resource; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -75,21 +73,14 @@ public void setUp() @Override public Authorizer getAuthorizer(String name) { - return new Authorizer() - { - @Override - public Access authorize( - AuthenticationResult authenticationResult, Resource resource, Action action - ) - { - if (authenticationResult.getIdentity().equals("druid")) { - return Access.OK; + return (authenticationResult, resource, action) -> { + if (authenticationResult.getIdentity().equals("druid")) { + return Access.OK; + } else { + if (resource.getName().equals("datasource2")) { + return new Access(false, "not authorized."); } else { - if (resource.getName().equals("datasource2")) { - return new Access(false, "not authorized."); - } else { - return Access.OK; - } + return Access.OK; } } }; @@ -171,7 +162,7 @@ public List getDataSources() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll(request); + Response response = supervisorResource.specGetAll(null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); @@ -181,12 +172,61 @@ public List getDataSources() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.specGetAll(request); + response = supervisorResource.specGetAll(null, request); verifyAll(); Assert.assertEquals(503, response.getStatus()); } + @Test + public void testSpecGetAllFull() + { + Set supervisorIds = ImmutableSet.of("id1", "id2"); + + SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null) { + + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null) { + + @Override + public List getDataSources() + { + return Collections.singletonList("datasource2"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce(); + EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(2); + EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(2); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( + new AuthenticationResult("druid", "druid", null, null) + ).atLeastOnce(); + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + Response response = supervisorResource.specGetAll("", request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + List> specs = (List>) response.getEntity(); + Assert.assertTrue( + specs.stream() + .allMatch(spec -> + ("id1".equals(spec.get("id")) && spec1.equals(spec.get("spec"))) || + ("id2".equals(spec.get("id")) && spec2.equals(spec.get("spec"))) + ) + ); + } + @Test public void testSpecGet() { @@ -249,6 +289,101 @@ public void testSpecGetStatus() Assert.assertEquals(503, response.getStatus()); } + @Test + public void testSpecSuspend() + { + + TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(running)).times(1) + .andReturn(Optional.of(suspended)).times(1); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(true); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + Response response = supervisorResource.specSuspend("my-id"); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + TestSupervisorSpec responseSpec = (TestSupervisorSpec) response.getEntity(); + Assert.assertEquals(suspended.id, responseSpec.id); + Assert.assertEquals(suspended.suspended, responseSpec.suspended); + resetAll(); + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce(); + replayAll(); + + response = supervisorResource.specSuspend("my-id"); + verifyAll(); + + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already suspended"), response.getEntity()); + } + + + + @Test + public void testSpecResume() + { + TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(suspended)).times(1) + .andReturn(Optional.of(running)).times(1); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(true); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + Response response = supervisorResource.specResume("my-id"); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + TestSupervisorSpec responseSpec = (TestSupervisorSpec) response.getEntity(); + Assert.assertEquals(running.id, responseSpec.id); + Assert.assertEquals(running.suspended, responseSpec.suspended); + resetAll(); + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce(); + replayAll(); + + response = supervisorResource.specResume("my-id"); + verifyAll(); + + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already running"), response.getEntity()); + } + @Test public void testShutdown() { @@ -762,9 +897,10 @@ public void testNoopSupervisorSpecSerde() throws Exception private static class TestSupervisorSpec implements SupervisorSpec { - private final String id; - private final Supervisor supervisor; - private final List datasources; + protected final String id; + protected final Supervisor supervisor; + protected final List datasources; + boolean suspended; public TestSupervisorSpec(String id, Supervisor supervisor, List datasources) { @@ -773,6 +909,12 @@ public TestSupervisorSpec(String id, Supervisor supervisor, List datasou this.datasources = datasources; } + public TestSupervisorSpec(String id, Supervisor supervisor, List datasources, boolean suspended) + { + this(id, supervisor, datasources); + this.suspended = suspended; + } + @Override public String getId() { @@ -791,6 +933,25 @@ public List getDataSources() return datasources; } + + @Override + public SupervisorSpec createSuspendedSpec() + { + return new TestSupervisorSpec(id, supervisor, datasources, true); + } + + @Override + public SupervisorSpec createRunningSpec() + { + return new TestSupervisorSpec(id, supervisor, datasources, false); + } + + @Override + public boolean isSuspended() + { + return suspended; + } + @Override public boolean equals(Object o) { @@ -809,7 +970,10 @@ public boolean equals(Object o) if (supervisor != null ? !supervisor.equals(that.supervisor) : that.supervisor != null) { return false; } - return datasources != null ? datasources.equals(that.datasources) : that.datasources == null; + if (datasources != null ? !datasources.equals(that.datasources) : that.datasources != null) { + return false; + } + return isSuspended() == that.isSuspended(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index c32ea7af0b24..6935a62f5457 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import org.apache.druid.indexing.overlord.DataSourceMetadata; import javax.annotation.Nullable; @@ -44,14 +45,28 @@ public class NoopSupervisorSpec implements SupervisorSpec @JsonProperty("id") private String id; + @JsonProperty("suspended") + private boolean suspended; //ignored + + @VisibleForTesting + public NoopSupervisorSpec( + String id, + List datasources + ) + { + this(id, datasources, null); + } + @JsonCreator public NoopSupervisorSpec( @Nullable @JsonProperty("id") String id, - @Nullable @JsonProperty("dataSources") List datasources + @Nullable @JsonProperty("dataSources") List datasources, + @Nullable @JsonProperty("suspended") Boolean suspended ) { this.id = id; this.datasources = datasources == null ? new ArrayList<>() : datasources; + this.suspended = false; // ignore } @Override @@ -61,6 +76,22 @@ public String getId() return id; } + + @Override + @Nullable + @JsonProperty("dataSources") + public List getDataSources() + { + return datasources; + } + + @Override + @JsonProperty("suspended") + public boolean isSuspended() + { + return suspended; + } + @Override public Supervisor createSupervisor() { @@ -95,11 +126,15 @@ public void checkpoint( } @Override - @Nullable - @JsonProperty("dataSources") - public List getDataSources() + public SupervisorSpec createRunningSpec() { - return datasources; + return new NoopSupervisorSpec(id, datasources); + } + + @Override + public SupervisorSpec createSuspendedSpec() + { + return new NoopSupervisorSpec(id, datasources); } @Override diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 01bcb4cea561..56717691fde0 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; import java.util.List; @@ -41,4 +42,19 @@ public interface SupervisorSpec Supervisor createSupervisor(); List getDataSources(); + + default SupervisorSpec createSuspendedSpec() + { + throw new NotImplementedException(); + } + + default SupervisorSpec createRunningSpec() + { + throw new NotImplementedException(); + } + + default boolean isSuspended() + { + return false; + } }