getCompactionConfigs()
+ {
+ return compactionConfigs;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CompactionConfigsResponse that = (CompactionConfigsResponse) o;
+ return Objects.equals(compactionConfigs, that.compactionConfigs);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(compactionConfigs);
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
index b29cefef83c7..057de1aef233 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
@@ -19,110 +19,332 @@
package org.apache.druid.indexing.overlord.http;
+import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
-import org.apache.druid.error.DruidException;
+import org.apache.druid.audit.AuditInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.InternalServerError;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.NotFound;
import org.apache.druid.indexing.compact.CompactionScheduler;
-import org.apache.druid.server.compaction.CompactionProgressResponse;
+import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.CompactionSupervisorManager;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.CoordinatorConfigManager;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry;
import org.apache.druid.server.http.ServletResourceUtils;
-import org.apache.druid.server.http.security.StateResourceFilter;
+import org.apache.druid.server.http.security.ConfigResourceFilter;
+import org.apache.druid.server.http.security.DatasourceResourceFilter;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.joda.time.Interval;
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
/**
- * Contains the same logic as {@code CoordinatorCompactionResource} but the APIs
- * are served by {@link CompactionScheduler} instead of {@code DruidCoordinator}.
+ * New compaction APIs exposed by the Overlord.
+ *
+ * If {@link #isCompactionSupervisorEnabled()} is true, then the APIs are served
+ * by the Overlord locally, either using the {@link CompactionScheduler} or the
+ * {@link SupervisorResource}. Otherwise, the APIs are redirected to the
+ * coordinator.
*/
@Path("/druid/indexer/v1/compaction")
public class OverlordCompactionResource
{
private final CompactionScheduler scheduler;
+ private final AuthorizerMapper authorizerMapper;
+ private final CoordinatorClient coordinatorClient;
+ private final CoordinatorConfigManager configManager;
+ private final CompactionSupervisorManager supervisorManager;
@Inject
public OverlordCompactionResource(
- CompactionScheduler scheduler
+ CompactionScheduler scheduler,
+ AuthorizerMapper authorizerMapper,
+ CoordinatorClient coordinatorClient,
+ CoordinatorConfigManager configManager,
+ CompactionSupervisorManager supervisorManager
)
{
this.scheduler = scheduler;
+ this.configManager = configManager;
+ this.authorizerMapper = authorizerMapper;
+ this.coordinatorClient = coordinatorClient;
+ this.supervisorManager = supervisorManager;
}
@GET
@Path("/isSupervisorEnabled")
@Produces(MediaType.APPLICATION_JSON)
- @ResourceFilters(StateResourceFilter.class)
+ @ResourceFilters(ConfigResourceFilter.class)
public Response isCompactionSupervisorEnabled()
{
- return Response.ok(scheduler.isEnabled()).build();
+ return ServletResourceUtils.buildReadResponse(
+ scheduler::isEnabled
+ );
+ }
+
+ @POST
+ @Path("/config/cluster")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(ConfigResourceFilter.class)
+ public Response updateClusterCompactionConfig(
+ ClusterCompactionConfig updatePayload,
+ @Context HttpServletRequest req
+ )
+ {
+ final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req);
+ return ServletResourceUtils.buildUpdateResponse(
+ () -> configManager.updateClusterCompactionConfig(updatePayload, auditInfo)
+ );
+ }
+
+ @GET
+ @Path("/config/cluster")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(ConfigResourceFilter.class)
+ public Response getClusterCompactionConfig()
+ {
+ return ServletResourceUtils.buildReadResponse(
+ configManager::getClusterCompactionConfig
+ );
}
@GET
- @Path("/progress")
+ @Path("/status/datasources")
@Produces(MediaType.APPLICATION_JSON)
- @ResourceFilters(StateResourceFilter.class)
- public Response getCompactionProgress(
- @QueryParam("dataSource") String dataSource
+ public Response getAllCompactionSnapshots(
+ @Context HttpServletRequest request
)
{
- if (!scheduler.isEnabled()) {
- return buildErrorResponseIfSchedulerDisabled();
+ if (scheduler.isEnabled()) {
+ return ServletResourceUtils.buildReadResponse(() -> {
+ final List allSnapshots =
+ List.copyOf(scheduler.getAllCompactionSnapshots().values());
+ return new CompactionStatusResponse(
+ AuthorizationUtils.filterByAuthorizedDatasources(
+ request,
+ allSnapshots,
+ AutoCompactionSnapshot::getDataSource,
+ authorizerMapper
+ )
+ );
+ });
+ } else {
+ return buildResponse(coordinatorClient.getCompactionSnapshots(null));
}
+ }
- if (dataSource == null || dataSource.isEmpty()) {
- return ServletResourceUtils.buildErrorResponseFrom(InvalidInput.exception("No DataSource specified"));
+ @GET
+ @Path("/status/datasources/{dataSource}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(DatasourceResourceFilter.class)
+ public Response getDatasourceCompactionSnapshot(
+ @PathParam("dataSource") String dataSource
+ )
+ {
+ if (isEmpty(dataSource)) {
+ return invalidInputResponse("No DataSource specified");
}
- final AutoCompactionSnapshot snapshot = scheduler.getCompactionSnapshot(dataSource);
- if (snapshot == null) {
- return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource"));
+ if (scheduler.isEnabled()) {
+ AutoCompactionSnapshot snapshot = scheduler.getCompactionSnapshot(dataSource);
+ if (snapshot == null) {
+ return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource"));
+ } else {
+ return Response.ok(snapshot).build();
+ }
} else {
- return Response.ok(new CompactionProgressResponse(snapshot.getBytesAwaitingCompaction()))
- .build();
+ return buildResponse(
+ Futures.transform(
+ coordinatorClient.getCompactionSnapshots(dataSource),
+ statusResponse -> Iterators.getOnlyElement(statusResponse.getLatestStatus().iterator()),
+ MoreExecutors.directExecutor()
+ )
+ );
}
}
@GET
- @Path("/status")
+ @Path("/config/datasources")
@Produces(MediaType.APPLICATION_JSON)
- @ResourceFilters(StateResourceFilter.class)
- public Response getCompactionSnapshots(
- @QueryParam("dataSource") String dataSource
+ public Response getAllCompactionConfigs(
+ @Context HttpServletRequest request
)
{
- if (!scheduler.isEnabled()) {
- return buildErrorResponseIfSchedulerDisabled();
+ if (scheduler.isEnabled()) {
+ return ServletResourceUtils.buildReadResponse(() -> {
+ final List configs = AuthorizationUtils.filterByAuthorizedDatasources(
+ request,
+ supervisorManager.getAllCompactionSupervisors(),
+ supervisor -> supervisor.getSpec().getDataSource(),
+ authorizerMapper
+ );
+ return new CompactionConfigsResponse(
+ configs.stream()
+ .map(CompactionSupervisorSpec::getSpec)
+ .collect(Collectors.toList())
+ );
+ });
+ } else {
+ return ServletResourceUtils.buildReadResponse(() -> {
+ final List configs = AuthorizationUtils.filterByAuthorizedDatasources(
+ request,
+ configManager.getCurrentCompactionConfig().getCompactionConfigs(),
+ DataSourceCompactionConfig::getDataSource,
+ authorizerMapper
+ );
+ return new CompactionConfigsResponse(configs);
+ });
}
+ }
- final Collection snapshots;
- if (dataSource == null || dataSource.isEmpty()) {
- snapshots = scheduler.getAllCompactionSnapshots().values();
+ @POST
+ @Path("/config/datasources/{dataSource}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(DatasourceResourceFilter.class)
+ public Response updateDatasourceCompactionConfig(
+ @PathParam("dataSource") String dataSource,
+ DataSourceCompactionConfig newConfig,
+ @Context HttpServletRequest request
+ )
+ {
+ if (isEmpty(dataSource)) {
+ return invalidInputResponse("No DataSource specified");
+ } else if (!dataSource.equals(newConfig.getDataSource())) {
+ return invalidInputResponse(
+ "DataSource in spec[%s] does not match DataSource in path[%s]",
+ newConfig.getDataSource(), dataSource
+ );
+ }
+
+ if (scheduler.isEnabled()) {
+ final CompactionSupervisorSpec spec = new CompactionSupervisorSpec(newConfig, false, scheduler);
+ return ServletResourceUtils.buildUpdateResponse(
+ () -> supervisorManager.updateCompactionSupervisor(spec, request)
+ );
} else {
- AutoCompactionSnapshot autoCompactionSnapshot = scheduler.getCompactionSnapshot(dataSource);
- if (autoCompactionSnapshot == null) {
- return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource"));
- }
- snapshots = Collections.singleton(autoCompactionSnapshot);
+ final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(request);
+ return ServletResourceUtils.buildUpdateResponse(
+ () -> configManager.updateDatasourceCompactionConfig(newConfig, auditInfo)
+ );
+ }
+ }
+
+ @GET
+ @Path("/config/datasources/{dataSource}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(DatasourceResourceFilter.class)
+ public Response getDatasourceCompactionConfig(
+ @PathParam("dataSource") String dataSource
+ )
+ {
+ if (isEmpty(dataSource)) {
+ return invalidInputResponse("No DataSource specified");
+ }
+
+ if (scheduler.isEnabled()) {
+ return ServletResourceUtils.buildReadResponse(
+ () -> supervisorManager.getCompactionSupervisor(dataSource).getSpec()
+ );
+ } else {
+ return ServletResourceUtils.buildReadResponse(
+ () -> configManager.getDatasourceCompactionConfig(dataSource)
+ );
+ }
+ }
+
+ @DELETE
+ @Path("/config/datasources/{dataSource}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(DatasourceResourceFilter.class)
+ public Response deleteDatasourceCompactionConfig(
+ @PathParam("dataSource") String dataSource,
+ @Context HttpServletRequest req
+ )
+ {
+ if (isEmpty(dataSource)) {
+ return invalidInputResponse("No DataSource specified");
+ }
+
+ if (scheduler.isEnabled()) {
+ return ServletResourceUtils.buildUpdateResponse(
+ () -> supervisorManager.deleteCompactionSupervisor(dataSource)
+ );
+ } else {
+ final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req);
+ return ServletResourceUtils.buildUpdateResponse(
+ () -> configManager.deleteDatasourceCompactionConfig(dataSource, auditInfo)
+ );
+ }
+ }
+
+ @GET
+ @Path("/config/datasources/{dataSource}/history")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(DatasourceResourceFilter.class)
+ public Response getDatasourceCompactionConfigHistory(
+ @PathParam("dataSource") String dataSource,
+ @QueryParam("interval") String interval,
+ @QueryParam("count") Integer count
+ )
+ {
+ if (isEmpty(dataSource)) {
+ return invalidInputResponse("No DataSource specified");
+ }
+
+ if (scheduler.isEnabled()) {
+ return ServletResourceUtils.buildReadResponse(
+ () -> new CompactionConfigHistoryResponse(
+ filterByCountAndInterval(
+ supervisorManager.getCompactionSupervisorHistory(dataSource),
+ interval,
+ count
+ )
+ )
+ );
+ } else {
+ return ServletResourceUtils.buildReadResponse(
+ () -> new CompactionConfigHistoryResponse(
+ configManager.getCompactionConfigHistory(dataSource, interval, count)
+ )
+ );
}
- return Response.ok(new CompactionStatusResponse(snapshots)).build();
}
@POST
@Path("/simulate")
@Consumes(MediaType.APPLICATION_JSON)
- @ResourceFilters(StateResourceFilter.class)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(ConfigResourceFilter.class)
public Response simulateRunWithConfigUpdate(
ClusterCompactionConfig updatePayload
)
@@ -132,14 +354,54 @@ public Response simulateRunWithConfigUpdate(
).build();
}
- private Response buildErrorResponseIfSchedulerDisabled()
+ private static boolean isEmpty(String dataSource)
{
- final String msg = "Compaction Supervisors are disabled on the Overlord."
- + " Use Coordinator APIs to fetch compaction status.";
- return ServletResourceUtils.buildErrorResponseFrom(
- DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.UNSUPPORTED)
- .build(msg)
- );
+ return dataSource == null || dataSource.isEmpty();
+ }
+
+ private static Response invalidInputResponse(String message, Object... args)
+ {
+ return ServletResourceUtils.buildErrorResponseFrom(InvalidInput.exception(message, args));
+ }
+
+ private static Response buildResponse(ListenableFuture future)
+ {
+ try {
+ return Response.ok(FutureUtils.getUnchecked(future, true)).build();
+ }
+ catch (Exception e) {
+ if (e.getCause() instanceof HttpResponseException) {
+ final HttpResponseException cause = (HttpResponseException) e.getCause();
+ return Response.status(cause.getResponse().getStatus().getCode())
+ .entity(cause.getResponse().getContent())
+ .build();
+ } else {
+ return ServletResourceUtils.buildErrorResponseFrom(
+ InternalServerError.exception(e.getMessage())
+ );
+ }
+ }
+ }
+
+ /**
+ * Filters the given list of audit entries by both interval and count, if
+ * specified.
+ */
+ private static List filterByCountAndInterval(
+ List entries,
+ @Nullable String serializedInterval,
+ @Nullable Integer count
+ )
+ {
+ final Interval interval = serializedInterval == null || serializedInterval.isEmpty()
+ ? null : Intervals.of(serializedInterval);
+ return entries.stream()
+ .filter(
+ entry -> interval == null
+ || entry.getAuditTime() == null
+ || interval.contains(entry.getAuditTime())
+ )
+ .limit(count == null ? Integer.MAX_VALUE : count)
+ .collect(Collectors.toList());
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/CompactionSupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/CompactionSupervisorManager.java
new file mode 100644
index 000000000000..123eee62b449
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/CompactionSupervisorManager.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import org.apache.druid.audit.AuditEntry;
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.NotFound;
+import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Wrapper over {@link SupervisorManager} used by
+ * {@link org.apache.druid.indexing.overlord.http.OverlordCompactionResource}
+ * to read and write compaction supervisor specs.
+ */
+public class CompactionSupervisorManager
+{
+ private final TaskMaster taskMaster;
+ private final AuditManager auditManager;
+
+ @Inject
+ public CompactionSupervisorManager(
+ TaskMaster taskMaster,
+ AuditManager auditManager
+ )
+ {
+ this.taskMaster = taskMaster;
+ this.auditManager = auditManager;
+ }
+
+ /**
+ * Creates or updates a compaction supervisor spec.
+ *
+ * @return true if the supervisor was updated successfully or if the supervisor
+ * is already in the desired state.
+ */
+ public boolean updateCompactionSupervisor(
+ CompactionSupervisorSpec spec,
+ HttpServletRequest request
+ )
+ {
+ return performIfLeader(manager -> {
+ // Check if the spec needs to be updated
+ if (manager.shouldUpdateSupervisor(spec) && manager.createOrUpdateAndStartSupervisor(spec)) {
+ final String auditPayload
+ = StringUtils.format("Update supervisor[%s] for datasource[%s]", spec.getId(), spec.getDataSources());
+ auditManager.doAudit(
+ AuditEntry.builder()
+ .key(spec.getId())
+ .type("supervisor")
+ .auditInfo(AuthorizationUtils.buildAuditInfo(request))
+ .request(AuthorizationUtils.buildRequestInfo("overlord", request))
+ .payload(auditPayload)
+ .build()
+ );
+ }
+
+ return true;
+ });
+ }
+
+ /**
+ * Gets the compaction supervisor for the given datasource, if one exists.
+ *
+ * @throws DruidException if a compaction supervisor does not exist for this
+ * datasource or if the supervisor is of an unexpected type.
+ */
+ public CompactionSupervisorSpec getCompactionSupervisor(String dataSource)
+ {
+ final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(dataSource);
+
+ return performIfLeader(manager -> {
+ final Optional specOptional = manager.getSupervisorSpec(supervisorId);
+
+ if (specOptional.isPresent()) {
+ SupervisorSpec spec = specOptional.get();
+ if (spec instanceof CompactionSupervisorSpec) {
+ return (CompactionSupervisorSpec) spec;
+ } else {
+ throw DruidException.defensive(
+ "Supervisor for ID[%s] is of unexpected type[%s]",
+ supervisorId, spec.getClass().getSimpleName()
+ );
+ }
+ } else {
+ throw NotFound.exception("Compaction supervisor for datasource[%s] does not exist", dataSource);
+ }
+ });
+ }
+
+ /**
+ * Deletes the compaction supervisor for the given datasource, if one exists.
+ *
+ * @return true if the supervisor was successfully deleted.
+ * @throws DruidException if a compaction supervisor does not exist for this
+ * datasource.
+ */
+ public boolean deleteCompactionSupervisor(String dataSource)
+ {
+ final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(dataSource);
+
+ return performIfLeader(manager -> {
+ if (manager.stopAndRemoveSupervisor(supervisorId)) {
+ return true;
+ } else {
+ throw NotFound.exception("Compaction supervisor for datasource[%s] does not exist", dataSource);
+ }
+ });
+ }
+
+ /**
+ * Returns all compaction supervisors.
+ */
+ public List getAllCompactionSupervisors()
+ {
+ return performIfLeader(manager -> {
+ final List compactionSpecs = new ArrayList<>();
+ for (String supervisorId : Set.copyOf(manager.getSupervisorIds())) {
+ Optional supervisorSpecOptional = manager.getSupervisorSpec(supervisorId);
+ if (!supervisorSpecOptional.isPresent()) {
+ continue;
+ }
+
+ final SupervisorSpec supervisorSpec = supervisorSpecOptional.get();
+ if (supervisorSpec instanceof CompactionSupervisorSpec) {
+ compactionSpecs.add(((CompactionSupervisorSpec) supervisorSpec));
+ }
+ }
+
+ return compactionSpecs;
+ });
+ }
+
+ /**
+ * Gets the change history for the compaction supervisor of the given datasource.
+ *
+ * @return Change history for the compaction supervisor of the given datasource
+ * in descending order by update time or an empty list if no history exists
+ * for the compaction supervisor of this datasource.
+ */
+ public List getCompactionSupervisorHistory(String dataSource)
+ {
+ final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(dataSource);
+
+ return performIfLeader(
+ manager -> manager
+ .getSupervisorHistoryForId(supervisorId)
+ .stream()
+ .filter(versionedSpec -> versionedSpec.getSpec() instanceof CompactionSupervisorSpec)
+ .map(
+ versionedSupervisorSpec -> new DataSourceCompactionConfigAuditEntry(
+ null,
+ ((CompactionSupervisorSpec) versionedSupervisorSpec.getSpec()).getSpec(),
+ null,
+ nullSafeDate(versionedSupervisorSpec.getVersion())
+ )
+ )
+ .collect(Collectors.toList())
+ );
+ }
+
+ private T performIfLeader(Function managerFunction)
+ {
+ Optional supervisorManager = taskMaster.getSupervisorManager();
+ if (supervisorManager.isPresent()) {
+ return managerFunction.apply(supervisorManager.get());
+ } else {
+ // Encourage client to try again soon, when we'll likely have a redirect set up
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.SERVICE_UNAVAILABLE)
+ .build("Overlord is not leader");
+ }
+ }
+
+ @Nullable
+ private static DateTime nullSafeDate(String date)
+ {
+ return date == null || date.isEmpty() ? null : DateTimes.of(date);
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
index 0e128e4cce5a..b204a69ed384 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
@@ -72,6 +72,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
public class OverlordCompactionSchedulerTest
{
@@ -90,7 +91,7 @@ public class OverlordCompactionSchedulerTest
);
}
- private DruidCompactionConfig compactionConfig;
+ private AtomicReference compactionConfig;
private CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig;
private TaskMaster taskMaster;
@@ -127,13 +128,13 @@ public void setUp()
serviceEmitter = new StubServiceEmitter();
segmentsMetadataManager = new TestSegmentsMetadataManager();
- compactionConfig = DruidCompactionConfig.empty();
+ compactionConfig = new AtomicReference<>(new ClusterCompactionConfig(null, null, null, true, null));
coordinatorOverlordServiceConfig = new CoordinatorOverlordServiceConfig(false, null);
- initScheduler(true);
+ initScheduler();
}
- private void initScheduler(boolean isSupervisorEnabled)
+ private void initScheduler()
{
TaskLockbox taskLockbox = new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator());
WorkerBehaviorConfig defaultWorkerConfig
@@ -142,9 +143,7 @@ private void initScheduler(boolean isSupervisorEnabled)
taskMaster,
new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, () -> defaultWorkerConfig),
segmentsMetadataManager,
- () -> compactionConfig.withClusterConfig(
- new ClusterCompactionConfig(null, null, null, isSupervisorEnabled, null)
- ),
+ () -> DruidCompactionConfig.empty().withClusterConfig(compactionConfig.get()),
new CompactionStatusTracker(OBJECT_MAPPER),
coordinatorOverlordServiceConfig,
(nameFormat, numThreads) -> new WrappingScheduledExecutorService("test", executor, false),
@@ -154,69 +153,120 @@ private void initScheduler(boolean isSupervisorEnabled)
}
@Test
- public void testStartStopWhenSchedulerIsEnabled()
+ public void testBecomeLeader_triggersStart_ifEnabled()
{
+ Assert.assertTrue(scheduler.isEnabled());
+
Assert.assertFalse(scheduler.isRunning());
+ Assert.assertFalse(executor.hasPendingTasks());
+
+ scheduler.becomeLeader();
+ runScheduledJob();
- scheduler.start();
Assert.assertTrue(scheduler.isRunning());
- Assert.assertTrue(executor.hasPendingTasks());
- scheduler.stop();
+ }
+
+ @Test
+ public void testBecomeLeader_doesNotTriggerStart_ifDisabled()
+ {
+ disableScheduler();
+ Assert.assertFalse(scheduler.isEnabled());
+
+ Assert.assertFalse(scheduler.isRunning());
+
+ scheduler.becomeLeader();
+ runScheduledJob();
+
Assert.assertFalse(scheduler.isRunning());
- Assert.assertTrue(executor.hasPendingTasks());
+ }
+
+ @Test
+ public void testStopBeingLeader_triggersStop()
+ {
+ Assert.assertFalse(scheduler.isRunning());
+
+ scheduler.becomeLeader();
+ runScheduledJob();
+ Assert.assertTrue(scheduler.isRunning());
- scheduler.start();
+ scheduler.stopBeingLeader();
Assert.assertTrue(scheduler.isRunning());
- scheduler.stop();
+
+ runScheduledJob();
Assert.assertFalse(scheduler.isRunning());
}
@Test
- public void testStartStopWhenScheduledIsDisabled()
+ public void testDisablingScheduler_triggersStop()
{
- initScheduler(false);
+ // Start scheduler
+ scheduler.becomeLeader();
+ runScheduledJob();
+ Assert.assertTrue(scheduler.isRunning());
+ // Disable scheduler to trigger stop
+ disableScheduler();
+ Assert.assertFalse(scheduler.isEnabled());
+ Assert.assertTrue(scheduler.isRunning());
+
+ // Scheduler finally stops in the next schedule cycle
+ runScheduledJob();
Assert.assertFalse(scheduler.isRunning());
- scheduler.start();
+ }
+
+ @Test
+ public void testEnablingScheduler_triggersStart()
+ {
+ disableScheduler();
+
+ // Becoming leader does not trigger start since scheduler is disabled
+ scheduler.becomeLeader();
+ runScheduledJob();
Assert.assertFalse(scheduler.isRunning());
- Assert.assertFalse(executor.hasPendingTasks());
- scheduler.stop();
+
+ // Enable the schduler to trigger start
+ enableScheduler();
Assert.assertFalse(scheduler.isRunning());
- Assert.assertFalse(executor.hasPendingTasks());
+
+ // Scheduler finally starts in the next schedule cycle
+ runScheduledJob();
+ Assert.assertTrue(scheduler.isRunning());
}
@Test
- public void testSegmentsAreNotPolledWhenSchedulerIsDisabled()
+ public void testSegmentsAreNotPolled_ifSupervisorsAreDisabled()
{
- initScheduler(false);
+ disableScheduler();
verifySegmentPolling(false);
}
@Test
- public void testSegmentsArePolledWhenRunningInStandaloneMode()
+ public void testSegmentsArePolled_whenRunningInStandaloneMode()
{
coordinatorOverlordServiceConfig = new CoordinatorOverlordServiceConfig(false, null);
- initScheduler(true);
+ initScheduler();
verifySegmentPolling(true);
}
@Test
- public void testSegmentsAreNotPolledWhenRunningInCoordinatorMode()
+ public void testSegmentsAreNotPolled_whenRunningInCoordinatorMode()
{
coordinatorOverlordServiceConfig = new CoordinatorOverlordServiceConfig(true, "overlord");
- initScheduler(true);
+ initScheduler();
verifySegmentPolling(false);
}
private void verifySegmentPolling(boolean enabled)
{
- scheduler.start();
+ scheduler.becomeLeader();
+ runScheduledJob();
Assert.assertEquals(enabled, segmentsMetadataManager.isPollingDatabasePeriodically());
- scheduler.stop();
+ scheduler.stopBeingLeader();
+ runScheduledJob();
Assert.assertFalse(segmentsMetadataManager.isPollingDatabasePeriodically());
}
@@ -247,12 +297,12 @@ public void testMsqCompactionConfigWithOneMaxTasksIsInvalid()
}
@Test
- public void testStartCompactionForDatasource()
+ public void testStartCompaction()
{
final List wikiSegments = CreateDataSegments.ofDatasource(TestDataSource.WIKI).eachOfSizeInMb(100);
wikiSegments.forEach(segmentsMetadataManager::addSegment);
- scheduler.start();
+ scheduler.becomeLeader();
scheduler.startCompaction(
TestDataSource.WIKI,
InlineSchemaDataSourceCompactionConfig.builder()
@@ -288,16 +338,16 @@ public void testStartCompactionForDatasource()
serviceEmitter.verifyValue(Stats.Compaction.SUBMITTED_TASKS.getMetricName(), 1L);
serviceEmitter.verifyValue(Stats.Compaction.COMPACTED_BYTES.getMetricName(), 100_000_000L);
- scheduler.stop();
+ scheduler.stopBeingLeader();
}
@Test
- public void testStopCompactionForDatasource()
+ public void testStopCompaction()
{
final List wikiSegments = CreateDataSegments.ofDatasource(TestDataSource.WIKI).eachOfSizeInMb(100);
wikiSegments.forEach(segmentsMetadataManager::addSegment);
- scheduler.start();
+ scheduler.becomeLeader();
scheduler.startCompaction(
TestDataSource.WIKI,
InlineSchemaDataSourceCompactionConfig.builder()
@@ -311,17 +361,22 @@ public void testStopCompactionForDatasource()
Mockito.verify(taskQueue, Mockito.never()).add(ArgumentMatchers.any());
- Assert.assertNull(scheduler.getCompactionSnapshot(TestDataSource.WIKI));
+ Assert.assertEquals(
+ AutoCompactionSnapshot.builder(TestDataSource.WIKI)
+ .withStatus(AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED)
+ .build(),
+ scheduler.getCompactionSnapshot(TestDataSource.WIKI)
+ );
Assert.assertTrue(scheduler.getAllCompactionSnapshots().isEmpty());
serviceEmitter.verifyNotEmitted(Stats.Compaction.SUBMITTED_TASKS.getMetricName());
serviceEmitter.verifyNotEmitted(Stats.Compaction.COMPACTED_BYTES.getMetricName());
- scheduler.stop();
+ scheduler.stopBeingLeader();
}
@Test
- public void testRunSimulation()
+ public void testSimulateRun()
{
final List wikiSegments = CreateDataSegments
.ofDatasource(TestDataSource.WIKI)
@@ -331,7 +386,9 @@ public void testRunSimulation()
.eachOfSizeInMb(100);
wikiSegments.forEach(segmentsMetadataManager::addSegment);
- scheduler.start();
+ scheduler.becomeLeader();
+ runScheduledJob();
+
scheduler.startCompaction(
TestDataSource.WIKI,
InlineSchemaDataSourceCompactionConfig.builder()
@@ -370,7 +427,22 @@ public void testRunSimulation()
);
Assert.assertTrue(simulateResultWhenDisabled.getCompactionStates().isEmpty());
- scheduler.stop();
+ scheduler.stopBeingLeader();
+ }
+
+ private void disableScheduler()
+ {
+ compactionConfig.set(new ClusterCompactionConfig(null, null, null, false, null));
+ }
+
+ private void enableScheduler()
+ {
+ compactionConfig.set(new ClusterCompactionConfig(null, null, null, true, null));
+ }
+
+ private void runScheduledJob()
+ {
+ executor.finishNextPendingTask();
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java
index de9cd7ec6c06..50bae6c8d2a6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java
@@ -19,215 +19,448 @@
package org.apache.druid.indexing.overlord.http;
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.error.DruidException;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.error.ErrorResponse;
+import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexing.compact.CompactionScheduler;
+import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.supervisor.CompactionSupervisorManager;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.indexing.overlord.supervisor.VersionedSupervisorSpec;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestDataSource;
-import org.apache.druid.server.compaction.CompactionProgressResponse;
-import org.apache.druid.server.compaction.CompactionStatistics;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
+import org.apache.druid.server.coordinator.CoordinatorConfigManager;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry;
+import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
+import org.apache.druid.server.security.AllowAllAuthorizer;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert;
+import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
-import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
public class OverlordCompactionResourceTest
{
+ private final Random random = new Random(1000);
+
private CompactionScheduler scheduler;
+ private OverlordCompactionResource compactionResource;
+
+ private CoordinatorClient coordinatorClient;
+ private CoordinatorConfigManager configManager;
+
+ private HttpServletRequest httpRequest;
+ private AuthorizerMapper authorizerMapper;
+
+ private TaskMaster taskMaster;
+ private SupervisorManager supervisorManager;
+ private final AtomicBoolean useSupervisors = new AtomicBoolean(false);
+
+ private DataSourceCompactionConfig wikiConfig;
+
+ /**
+ * Mock instance of CompactionScheduler used only for validating compaction configs.
+ */
+ private CompactionScheduler validator;
@Before
public void setUp()
{
+ useSupervisors.set(true);
scheduler = EasyMock.createStrictMock(CompactionScheduler.class);
- EasyMock.expect(scheduler.isEnabled()).andReturn(true).anyTimes();
+ EasyMock.expect(scheduler.isEnabled()).andAnswer(useSupervisors::get).anyTimes();
+
+ coordinatorClient = EasyMock.createStrictMock(CoordinatorClient.class);
+ supervisorManager = EasyMock.createStrictMock(SupervisorManager.class);
+ configManager = EasyMock.createStrictMock(CoordinatorConfigManager.class);
+
+ httpRequest = EasyMock.createStrictMock(HttpServletRequest.class);
+ authorizerMapper = EasyMock.createStrictMock(AuthorizerMapper.class);
+ EasyMock.expect(authorizerMapper.getAuthorizer("druid")).andReturn(new AllowAllAuthorizer()).anyTimes();
+
+ taskMaster = EasyMock.createStrictMock(TaskMaster.class);
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).anyTimes();
+
+ final AuditManager auditManager = EasyMock.createStrictMock(AuditManager.class);
+
+ compactionResource = new OverlordCompactionResource(
+ scheduler,
+ authorizerMapper,
+ coordinatorClient,
+ configManager,
+ new CompactionSupervisorManager(taskMaster, auditManager)
+ );
+
+ wikiConfig = InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(TestDataSource.WIKI)
+ .withTaskPriority(random.nextInt(100))
+ .withSkipOffsetFromLatest(Period.days(random.nextInt(5)))
+ .build();
+
+ validator = EasyMock.createStrictMock(CompactionScheduler.class);
+ EasyMock.expect(validator.validateCompactionConfig(EasyMock.anyObject()))
+ .andReturn(CompactionConfigValidationResult.success())
+ .anyTimes();
+ EasyMock.replay(validator, taskMaster, authorizerMapper);
}
@After
public void tearDown()
{
- EasyMock.verify(scheduler);
+ EasyMock.verify(
+ validator,
+ scheduler,
+ taskMaster,
+ httpRequest,
+ configManager,
+ authorizerMapper,
+ coordinatorClient,
+ supervisorManager
+ );
+ }
+
+ private void replayAll()
+ {
+ EasyMock.replay(scheduler, httpRequest, configManager, coordinatorClient, supervisorManager);
}
@Test
- public void testGetCompactionSnapshotWithEmptyDatasource()
+ public void test_updateClusterConfig()
{
- final Map allSnapshots = ImmutableMap.of(
- TestDataSource.WIKI,
- AutoCompactionSnapshot.builder(TestDataSource.WIKI).build()
- );
+ EasyMock.expect(configManager.updateClusterCompactionConfig(EasyMock.anyObject(), EasyMock.anyObject()))
+ .andReturn(true)
+ .once();
- EasyMock.expect(scheduler.getAllCompactionSnapshots())
- .andReturn(allSnapshots).once();
- EasyMock.replay(scheduler);
+ setupMockRequestForAudit();
+ replayAll();
- final Response response = new OverlordCompactionResource(scheduler)
- .getCompactionSnapshots("");
- Assert.assertEquals(200, response.getStatus());
- Assert.assertEquals(
- new CompactionStatusResponse(allSnapshots.values()),
- response.getEntity()
+ Response response = compactionResource.updateClusterCompactionConfig(
+ new ClusterCompactionConfig(0.5, 10, null, true, CompactionEngine.MSQ),
+ httpRequest
);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(Map.of("success", true), response.getEntity());
}
@Test
- public void testGetCompactionSnapshotWithNullDatasource()
+ public void test_getClusterConfig()
{
- final Map allSnapshots = ImmutableMap.of(
- TestDataSource.WIKI,
- AutoCompactionSnapshot.builder(TestDataSource.WIKI).build()
- );
+ final ClusterCompactionConfig clusterConfig =
+ new ClusterCompactionConfig(0.4, 100, null, true, CompactionEngine.MSQ);
+ EasyMock.expect(configManager.getClusterCompactionConfig())
+ .andReturn(clusterConfig)
+ .once();
+ replayAll();
+
+ final Response response = compactionResource.getClusterCompactionConfig();
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(clusterConfig, response.getEntity());
+ }
- EasyMock.expect(scheduler.getAllCompactionSnapshots())
- .andReturn(allSnapshots).once();
- EasyMock.replay(scheduler);
+ @Test
+ public void test_getDatasourceCompactionSnapshot_returnsInvalidInput_ifDatasourceIsNullOrEmpty()
+ {
+ replayAll();
- final Response response = new OverlordCompactionResource(scheduler)
- .getCompactionSnapshots(null);
- Assert.assertEquals(200, response.getStatus());
- Assert.assertEquals(
- new CompactionStatusResponse(allSnapshots.values()),
- response.getEntity()
+ verifyInvalidInputResponse(
+ compactionResource.getDatasourceCompactionSnapshot(""),
+ "No DataSource specified"
+ );
+ verifyInvalidInputResponse(
+ compactionResource.getDatasourceCompactionSnapshot(null),
+ "No DataSource specified"
);
}
@Test
- public void testGetCompactionSnapshotWithValidDatasource()
+ public void test_getDatasourceCompactionSnapshot()
{
final AutoCompactionSnapshot snapshot = AutoCompactionSnapshot.builder(TestDataSource.WIKI).build();
EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.WIKI))
.andReturn(snapshot).once();
- EasyMock.replay(scheduler);
+ replayAll();
- final Response response = new OverlordCompactionResource(scheduler)
- .getCompactionSnapshots(TestDataSource.WIKI);
+ final Response response = compactionResource.getDatasourceCompactionSnapshot(TestDataSource.WIKI);
Assert.assertEquals(200, response.getStatus());
- Assert.assertEquals(
- new CompactionStatusResponse(Collections.singleton(snapshot)),
- response.getEntity()
- );
+ Assert.assertEquals(snapshot, response.getEntity());
}
@Test
- public void testGetCompactionSnapshotWithInvalidDatasource()
+ public void test_getDatasourceCompactionSnapshot_returnsNotFound_withInvalidDatasource()
{
EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.KOALA))
.andReturn(null).once();
- EasyMock.replay(scheduler);
+ replayAll();
- final Response response = new OverlordCompactionResource(scheduler)
- .getCompactionSnapshots(TestDataSource.KOALA);
+ final Response response = compactionResource.getDatasourceCompactionSnapshot(TestDataSource.KOALA);
Assert.assertEquals(404, response.getStatus());
}
@Test
- public void testGetProgressForValidDatasource()
+ public void test_getDatasourceCompactionSnapshot_redirectsToCoordinator_ifSchedulerIsDisabled()
{
- final AutoCompactionSnapshot.Builder snapshotBuilder
- = AutoCompactionSnapshot.builder(TestDataSource.WIKI);
- snapshotBuilder.incrementWaitingStats(CompactionStatistics.create(100L, 10L, 1L));
- final AutoCompactionSnapshot snapshot = snapshotBuilder.build();
+ useSupervisors.set(false);
- EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.WIKI))
- .andReturn(snapshot).once();
- EasyMock.replay(scheduler);
+ final AutoCompactionSnapshot snapshot =
+ AutoCompactionSnapshot.builder(TestDataSource.WIKI).build();
+ EasyMock.expect(coordinatorClient.getCompactionSnapshots(TestDataSource.WIKI))
+ .andReturn(Futures.immediateFuture(new CompactionStatusResponse(List.of(snapshot))));
+ replayAll();
- final Response response = new OverlordCompactionResource(scheduler)
- .getCompactionProgress(TestDataSource.WIKI);
+ final Response response = compactionResource.getDatasourceCompactionSnapshot(TestDataSource.WIKI);
Assert.assertEquals(200, response.getStatus());
- Assert.assertEquals(new CompactionProgressResponse(100L), response.getEntity());
+ Assert.assertEquals(snapshot, response.getEntity());
}
@Test
- public void testGetProgressForNullDatasourceReturnsBadRequest()
+ public void test_getAllCompactionSnapshots()
{
- EasyMock.replay(scheduler);
+ final AutoCompactionSnapshot snapshot =
+ AutoCompactionSnapshot.builder(TestDataSource.WIKI).build();
- final Response response = new OverlordCompactionResource(scheduler)
- .getCompactionProgress(null);
- Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+ setupMockRequestForUser("druid");
+ EasyMock.expect(httpRequest.getMethod()).andReturn("POST").once();
- final Object responseEntity = response.getEntity();
- Assert.assertTrue(responseEntity instanceof ErrorResponse);
+ EasyMock.expect(scheduler.getAllCompactionSnapshots())
+ .andReturn(Map.of(TestDataSource.WIKI, snapshot)).anyTimes();
+ replayAll();
- MatcherAssert.assertThat(
- ((ErrorResponse) responseEntity).getUnderlyingException(),
- DruidExceptionMatcher.invalidInput().expectMessageIs("No DataSource specified")
- );
+ final Response response = compactionResource.getAllCompactionSnapshots(httpRequest);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(new CompactionStatusResponse(List.of(snapshot)), response.getEntity());
}
@Test
- public void testGetProgressForInvalidDatasourceReturnsNotFound()
+ public void test_getDatasourceCompactionConfig()
{
- EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.KOALA))
- .andReturn(null).once();
- EasyMock.replay(scheduler);
+ final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(TestDataSource.WIKI);
+ EasyMock.expect(supervisorManager.getSupervisorSpec(supervisorId))
+ .andReturn(Optional.of(new CompactionSupervisorSpec(wikiConfig, false, validator)))
+ .anyTimes();
+ replayAll();
- final Response response = new OverlordCompactionResource(scheduler)
- .getCompactionProgress(TestDataSource.KOALA);
- Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus());
+ final Response response = compactionResource.getDatasourceCompactionConfig(TestDataSource.WIKI);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(wikiConfig, response.getEntity());
+ }
- final Object responseEntity = response.getEntity();
- Assert.assertTrue(responseEntity instanceof ErrorResponse);
+ @Test
+ public void test_getDatasourceCompactionConfig_returnsInvalidInput_ifDatasourceIsNullOrEmpty()
+ {
+ replayAll();
- MatcherAssert.assertThat(
- ((ErrorResponse) responseEntity).getUnderlyingException(),
- DruidExceptionMatcher.notFound().expectMessageIs("Unknown DataSource")
+ verifyInvalidInputResponse(
+ compactionResource.getDatasourceCompactionConfig(""),
+ "No DataSource specified"
+ );
+ verifyInvalidInputResponse(
+ compactionResource.getDatasourceCompactionConfig(null),
+ "No DataSource specified"
);
}
@Test
- public void testGetProgressReturnsUnsupportedWhenSupervisorDisabled()
+ public void test_updateDatasourceCompactionConfig()
{
- scheduler = EasyMock.createStrictMock(CompactionScheduler.class);
- EasyMock.expect(scheduler.isEnabled()).andReturn(false).once();
- EasyMock.replay(scheduler);
+ setupMockRequestForAudit();
+ EasyMock.expect(httpRequest.getMethod()).andReturn("POST").once();
+ EasyMock.expect(httpRequest.getRequestURI()).andReturn("supes").once();
+ EasyMock.expect(httpRequest.getQueryString()).andReturn("a=b").once();
+
+ final CompactionSupervisorSpec supervisorSpec =
+ new CompactionSupervisorSpec(wikiConfig, false, validator);
+
+ EasyMock.expect(supervisorManager.shouldUpdateSupervisor(supervisorSpec))
+ .andReturn(true).once();
+ EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(supervisorSpec))
+ .andReturn(true).once();
+ EasyMock.expect(scheduler.validateCompactionConfig(wikiConfig))
+ .andReturn(CompactionConfigValidationResult.success()).once();
+ replayAll();
+
+ final Response response = compactionResource
+ .updateDatasourceCompactionConfig(TestDataSource.WIKI, wikiConfig, httpRequest);
+ Assert.assertEquals(200, response.getStatus());
+ }
- verifyResponseWhenSupervisorDisabled(
- new OverlordCompactionResource(scheduler)
- .getCompactionProgress(TestDataSource.WIKI)
+ @Test
+ public void test_updateDatasourceCompactionConfig_returnsInvalidInput_ifDatasourceIsNullOrEmpty()
+ {
+ replayAll();
+
+ verifyInvalidInputResponse(
+ compactionResource.updateDatasourceCompactionConfig("", wikiConfig, httpRequest),
+ "No DataSource specified"
+ );
+ verifyInvalidInputResponse(
+ compactionResource.updateDatasourceCompactionConfig(null, wikiConfig, httpRequest),
+ "No DataSource specified"
);
}
@Test
- public void testGetSnapshotReturnsUnsupportedWhenSupervisorDisabled()
+ public void test_deleteDatasourceCompactionConfig()
{
- scheduler = EasyMock.createStrictMock(CompactionScheduler.class);
- EasyMock.expect(scheduler.isEnabled()).andReturn(false).once();
- EasyMock.replay(scheduler);
+ final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(TestDataSource.WIKI);
+ EasyMock.expect(supervisorManager.stopAndRemoveSupervisor(supervisorId))
+ .andReturn(true)
+ .once();
+ replayAll();
+
+ final Response response = compactionResource
+ .deleteDatasourceCompactionConfig(TestDataSource.WIKI, httpRequest);
+ Assert.assertEquals(200, response.getStatus());
+ }
+
+ @Test
+ public void test_deleteDatasourceCompactionConfig_returnsInvalidInput_ifDatasourceIsNullOrEmpty()
+ {
+ replayAll();
- verifyResponseWhenSupervisorDisabled(
- new OverlordCompactionResource(scheduler)
- .getCompactionSnapshots(TestDataSource.WIKI)
+ verifyInvalidInputResponse(
+ compactionResource.deleteDatasourceCompactionConfig("", httpRequest),
+ "No DataSource specified"
+ );
+ verifyInvalidInputResponse(
+ compactionResource.deleteDatasourceCompactionConfig(null, httpRequest),
+ "No DataSource specified"
);
}
- private void verifyResponseWhenSupervisorDisabled(Response response)
+ @Test
+ public void test_getAllCompactionConfigs()
{
- Assert.assertEquals(501, response.getStatus());
+ final String supervisorId = CompactionSupervisorSpec
+ .getSupervisorIdForDatasource(TestDataSource.WIKI);
+
+ setupMockRequestForUser("druid");
+ EasyMock.expect(httpRequest.getMethod()).andReturn("POST").once();
+
+ EasyMock.expect(supervisorManager.getSupervisorIds())
+ .andReturn(Set.of(supervisorId))
+ .once();
+ EasyMock.expect(supervisorManager.getSupervisorSpec(supervisorId))
+ .andReturn(Optional.of(new CompactionSupervisorSpec(wikiConfig, false, validator)))
+ .once();
+ replayAll();
+
+ final Response response = compactionResource.getAllCompactionConfigs(httpRequest);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(new CompactionConfigsResponse(List.of(wikiConfig)), response.getEntity());
+ }
+
+ @Test
+ public void test_getDatasourceCompactionConfigHistory_withFilters()
+ {
+ final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(TestDataSource.WIKI);
+
+ final CompactionSupervisorSpec spec = new CompactionSupervisorSpec(wikiConfig, false, validator);
+ final List specVersions = List.of(
+ new VersionedSupervisorSpec(spec, "2025-03"),
+ new VersionedSupervisorSpec(spec, "2025-02"),
+ new VersionedSupervisorSpec(spec, "2025-01")
+ );
+
+ EasyMock.expect(supervisorManager.getSupervisorHistoryForId(supervisorId))
+ .andReturn(specVersions)
+ .anyTimes();
+ replayAll();
- final Object responseEntity = response.getEntity();
- Assert.assertTrue(responseEntity instanceof ErrorResponse);
+ final List history = List.of(
+ new DataSourceCompactionConfigAuditEntry(null, wikiConfig, null, DateTimes.of("2025-03")),
+ new DataSourceCompactionConfigAuditEntry(null, wikiConfig, null, DateTimes.of("2025-02")),
+ new DataSourceCompactionConfigAuditEntry(null, wikiConfig, null, DateTimes.of("2025-01"))
+ );
+
+ Response response = compactionResource
+ .getDatasourceCompactionConfigHistory(TestDataSource.WIKI, null, null);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(toHistoryResponse(history), response.getEntity());
+
+ // Filter by count
+ response = compactionResource
+ .getDatasourceCompactionConfigHistory(TestDataSource.WIKI, null, 2);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(toHistoryResponse(history.subList(0, 2)), response.getEntity());
+
+ // Filter by interval
+ response = compactionResource
+ .getDatasourceCompactionConfigHistory(TestDataSource.WIKI, "2025-01/P40D", null);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(toHistoryResponse(history.subList(1, 3)), response.getEntity());
+
+ // Filter by interval and count
+ response = compactionResource
+ .getDatasourceCompactionConfigHistory(TestDataSource.WIKI, "2025-01/P40D", 1);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(toHistoryResponse(history.subList(1, 2)), response.getEntity());
+ }
+ private static CompactionConfigHistoryResponse toHistoryResponse(
+ List entries
+ )
+ {
+ return new CompactionConfigHistoryResponse(entries);
+ }
+
+ private void verifyInvalidInputResponse(Response response, String message)
+ {
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
MatcherAssert.assertThat(
- ((ErrorResponse) responseEntity).getUnderlyingException(),
- new DruidExceptionMatcher(
- DruidException.Persona.USER,
- DruidException.Category.UNSUPPORTED,
- "general"
- ).expectMessageIs(
- "Compaction Supervisors are disabled on the Overlord."
- + " Use Coordinator APIs to fetch compaction status."
- )
+ ((ErrorResponse) response.getEntity()).getUnderlyingException(),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(message)
);
}
+
+ private void setupMockRequestForUser(String user)
+ {
+ EasyMock.expect(httpRequest.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
+ EasyMock.expect(httpRequest.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
+ EasyMock.expect(httpRequest.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+ .andReturn(new AuthenticationResult(user, "druid", null, null))
+ .atLeastOnce();
+ httpRequest.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+ EasyMock.expectLastCall().anyTimes();
+ }
+
+ private void setupMockRequestForAudit()
+ {
+ EasyMock.expect(httpRequest.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn("author").once();
+ EasyMock.expect(httpRequest.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn("comment").once();
+
+ EasyMock.expect(httpRequest.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+ .andReturn(new AuthenticationResult("druid", "druid", null, null))
+ .atLeastOnce();
+
+ EasyMock.expect(httpRequest.getRemoteAddr()).andReturn("127.0.0.1").atLeastOnce();
+ // EasyMock.expect(httpRequest.getMethod()).andReturn("POST").once();
+
+ // EasyMock.expect(httpRequest.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
+ }
}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
index da0b2f3516d0..9649a2cce707 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
@@ -22,12 +22,16 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.http.CompactionConfigsResponse;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.server.compaction.CompactionSimulateResult;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
@@ -41,9 +45,12 @@
public class CompactionResourceTestClient
{
+ private static final Logger log = new Logger(CompactionResourceTestClient.class);
+
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final String coordinator;
+ private final String overlord;
private final StatusResponseHandler responseHandler;
@Inject
@@ -56,6 +63,7 @@ public class CompactionResourceTestClient
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.coordinator = config.getCoordinatorUrl();
+ this.overlord = config.getOverlordUrl();
this.responseHandler = StatusResponseHandler.getInstance();
}
@@ -67,14 +75,24 @@ private String getCoordinatorURL()
);
}
+ private String getOverlordURL()
+ {
+ return StringUtils.format("%s/druid/indexer/v1", overlord);
+ }
+
public void submitCompactionConfig(final DataSourceCompactionConfig dataSourceCompactionConfig) throws Exception
{
- String url = StringUtils.format("%sconfig/compaction", getCoordinatorURL());
+ final String dataSource = dataSourceCompactionConfig.getDataSource();
+ String url = StringUtils.format(
+ "%s/compaction/config/datasources/%s",
+ getOverlordURL(), StringUtils.urlEncode(dataSource)
+ );
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(dataSourceCompactionConfig)
- ), responseHandler
+ ),
+ responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -84,11 +102,18 @@ public void submitCompactionConfig(final DataSourceCompactionConfig dataSourceCo
response.getContent()
);
}
+ log.info(
+ "Submitted compaction config for datasource[%s] with response[%s]",
+ dataSource, response.getContent()
+ );
}
public void deleteDataSourceCompactionConfig(final String dataSource) throws Exception
{
- String url = StringUtils.format("%sconfig/compaction/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
+ String url = StringUtils.format(
+ "%s/compaction/config/datasources/%s",
+ getOverlordURL(), StringUtils.urlEncode(dataSource)
+ );
StatusResponseHolder response = httpClient.go(new Request(HttpMethod.DELETE, new URL(url)), responseHandler).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -100,7 +125,12 @@ public void deleteDataSourceCompactionConfig(final String dataSource) throws Exc
}
}
- public DruidCompactionConfig getCompactionConfig() throws Exception
+ /**
+ * For all purposes, use the new APIs {@link #getClusterConfig()} or
+ * {@link #getAllCompactionConfigs()}.
+ */
+ @Deprecated
+ public DruidCompactionConfig getCoordinatorCompactionConfig() throws Exception
{
String url = StringUtils.format("%sconfig/compaction", getCoordinatorURL());
StatusResponseHolder response = httpClient.go(
@@ -116,9 +146,32 @@ public DruidCompactionConfig getCompactionConfig() throws Exception
return jsonMapper.readValue(response.getContent(), new TypeReference<>() {});
}
+ public List getAllCompactionConfigs() throws Exception
+ {
+ String url = StringUtils.format("%s/compaction/config/datasources", getOverlordURL());
+ StatusResponseHolder response = httpClient.go(
+ new Request(HttpMethod.GET, new URL(url)), responseHandler
+ ).get();
+ if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new ISE(
+ "Error while getting compaction config status[%s] content[%s]",
+ response.getStatus(),
+ response.getContent()
+ );
+ }
+ final CompactionConfigsResponse payload = jsonMapper.readValue(
+ response.getContent(),
+ new TypeReference<>() {}
+ );
+ return payload.getCompactionConfigs();
+ }
+
public DataSourceCompactionConfig getDataSourceCompactionConfig(String dataSource) throws Exception
{
- String url = StringUtils.format("%sconfig/compaction/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
+ String url = StringUtils.format(
+ "%s/compaction/config/datasources/%s",
+ getOverlordURL(), StringUtils.urlEncode(dataSource)
+ );
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)), responseHandler
).get();
@@ -134,6 +187,22 @@ public DataSourceCompactionConfig getDataSourceCompactionConfig(String dataSourc
public void forceTriggerAutoCompaction() throws Exception
{
+ // Perform a dummy update of task slots to force the coordinator to refresh its compaction config
+ final ClusterCompactionConfig clusterConfig = getClusterConfig();
+ updateCompactionTaskSlot(
+ clusterConfig.getCompactionTaskSlotRatio(),
+ clusterConfig.getMaxCompactionTaskSlots() + 10
+ );
+ updateCompactionTaskSlot(
+ clusterConfig.getCompactionTaskSlotRatio(),
+ clusterConfig.getMaxCompactionTaskSlots()
+ );
+ final CompactionSimulateResult simulateResult = simulateRunOnCoordinator();
+ log.info(
+ "Triggering compaction duty on Coordinator. Expected jobs: %s",
+ simulateResult.getCompactionStates()
+ );
+
String url = StringUtils.format("%scompaction/compact", getCoordinatorURL());
StatusResponseHolder response = httpClient.go(new Request(HttpMethod.POST, new URL(url)), responseHandler).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -145,9 +214,52 @@ public void forceTriggerAutoCompaction() throws Exception
}
}
- public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer maxCompactionTaskSlots) throws Exception
+ public void updateClusterConfig(ClusterCompactionConfig config) throws Exception
{
- String url = StringUtils.format(
+ final String url = StringUtils.format(
+ "%s/compaction/config/cluster",
+ getOverlordURL()
+ );
+ StatusResponseHolder response = httpClient.go(
+ new Request(HttpMethod.POST, new URL(url)).setContent(
+ "application/json",
+ jsonMapper.writeValueAsBytes(config)
+ ),
+ responseHandler
+ ).get();
+ if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new ISE(
+ "Error while updating cluster compaction config, status[%s], content[%s]",
+ response.getStatus(),
+ response.getContent()
+ );
+ }
+ }
+
+ public ClusterCompactionConfig getClusterConfig() throws Exception
+ {
+ String url = StringUtils.format("%s/compaction/config/cluster", getOverlordURL());
+ StatusResponseHolder response = httpClient.go(
+ new Request(HttpMethod.GET, new URL(url)), responseHandler
+ ).get();
+ if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new ISE(
+ "Error while getting compaction config status[%s] content[%s]",
+ response.getStatus(),
+ response.getContent()
+ );
+ }
+ return jsonMapper.readValue(response.getContent(), new TypeReference<>() {});
+ }
+
+ /**
+ * This API is currently only to force the coordinator to refresh its config.
+ * For all other purposes, use {@link #updateClusterConfig}.
+ */
+ @Deprecated
+ private void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer maxCompactionTaskSlots) throws Exception
+ {
+ final String url = StringUtils.format(
"%sconfig/compaction/taskslots?ratio=%s&max=%s",
getCoordinatorURL(),
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
@@ -197,4 +309,26 @@ public Map getCompactionStatus(String dataSource) throws Excepti
Map>> latestSnapshots = jsonMapper.readValue(response.getContent(), new TypeReference<>() {});
return latestSnapshots.get("latestStatus").get(0);
}
+
+ public CompactionSimulateResult simulateRunOnCoordinator() throws Exception
+ {
+ final ClusterCompactionConfig clusterConfig = getClusterConfig();
+
+ final String url = StringUtils.format("%scompaction/simulate", getCoordinatorURL());
+ StatusResponseHolder response = httpClient.go(
+ new Request(HttpMethod.POST, new URL(url)).setContent(
+ "application/json",
+ jsonMapper.writeValueAsBytes(clusterConfig)
+ ),
+ responseHandler
+ ).get();
+ if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new ISE(
+ "Error while running simulation on Coordinator: status[%s], content[%s]",
+ response.getStatus(), response.getContent()
+ );
+ }
+
+ return jsonMapper.readValue(response.getContent(), new TypeReference<>() {});
+ }
}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
index 3101786a4387..4fbbbdbb92a4 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
@@ -25,8 +25,8 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
-import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient;
import org.apache.druid.testing.clients.TaskResponseObject;
@@ -290,19 +290,12 @@ private void submitAndVerifyCompactionConfig() throws Exception
.withSkipOffsetFromLatest(Period.ZERO)
.withMaxRowsPerSegment(Specs.MAX_ROWS_PER_SEGMENT)
.build();
- compactionResource.updateCompactionTaskSlot(0.5, 10);
+ compactionResource.updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, null, null));
compactionResource.submitCompactionConfig(dataSourceCompactionConfig);
- // Wait for compaction config to persist
- Thread.sleep(2000);
-
// Verify that the compaction config is updated correctly.
- DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig();
DataSourceCompactionConfig observedCompactionConfig
- = compactionConfig.findConfigForDatasource(fullDatasourceName).orNull();
- Assert.assertEquals(observedCompactionConfig, dataSourceCompactionConfig);
-
- observedCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
+ = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
Assert.assertEquals(observedCompactionConfig, dataSourceCompactionConfig);
}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index fe6efcf47e74..48ef5994177b 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.datasketches.hll.TgtHllType;
@@ -58,7 +59,9 @@
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.transform.CompactionTransformSpec;
+import org.apache.druid.server.compaction.FixedIntervalOrderPolicy;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -99,6 +102,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
@Test(groups = {TestNGGroup.COMPACTION})
@Guice(moduleFactory = DruidTestModuleFactory.class)
@@ -115,6 +119,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
private static final String INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS = "/indexer/wikipedia_index_no_rollup_preserve_metric.json";
private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
private static final Period NO_SKIP_OFFSET = Period.seconds(0);
+ private static final FixedIntervalOrderPolicy COMPACT_NOTHING_POLICY = new FixedIntervalOrderPolicy(List.of());
@DataProvider(name = "engine")
public static Object[][] engine()
@@ -122,6 +127,12 @@ public static Object[][] engine()
return new Object[][]{{CompactionEngine.NATIVE}};
}
+ @DataProvider(name = "useSupervisors")
+ public static Object[][] useSupervisors()
+ {
+ return new Object[][]{{true}, {false}};
+ }
+
@Inject
protected CompactionResourceTestClient compactionResource;
@@ -621,7 +632,7 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
- AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ AutoCompactionSnapshot.ScheduleStatus.RUNNING,
Matchers.equalTo(0L),
Matchers.greaterThan(0L),
Matchers.greaterThan(0L),
@@ -639,7 +650,7 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
- AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ AutoCompactionSnapshot.ScheduleStatus.RUNNING,
Matchers.equalTo(0L),
Matchers.greaterThan(0L),
Matchers.equalTo(0L),
@@ -770,7 +781,7 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
- AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ AutoCompactionSnapshot.ScheduleStatus.RUNNING,
Matchers.greaterThan(0L),
Matchers.greaterThan(0L),
Matchers.equalTo(0L),
@@ -792,7 +803,7 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
- AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ AutoCompactionSnapshot.ScheduleStatus.RUNNING,
Matchers.equalTo(0L),
Matchers.greaterThan(0L),
Matchers.equalTo(0L),
@@ -1586,13 +1597,15 @@ public void testAutoCompactionDutyWithDimensionsSpec(CompactionEngine engine) th
}
}
- @Test
- public void testAutoCompactionDutyWithFilter() throws Exception
+ @Test(dataProvider = "useSupervisors")
+ public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exception
{
+ updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
+
loadData(INDEX_TASK_WITH_DIMENSION_SPEC);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
- intervalsBeforeCompaction.sort(null);
+ intervalsBeforeCompaction.sort(Ordering.natural().reversed());
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
@@ -1616,7 +1629,7 @@ public void testAutoCompactionDutyWithFilter() throws Exception
false,
CompactionEngine.NATIVE
);
- forceTriggerAutoCompaction(2);
+ forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2);
// For dim "page", result should only contain value "Striker Eureka"
queryAndResultFields = ImmutableMap.of(
@@ -1629,19 +1642,21 @@ public void testAutoCompactionDutyWithFilter() throws Exception
List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
// Verify compacted segments does not get compacted again
- forceTriggerAutoCompaction(2);
+ forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2);
List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
}
- @Test
- public void testAutoCompationDutyWithMetricsSpec() throws Exception
+ @Test(dataProvider = "useSupervisors")
+ public void testAutoCompationDutyWithMetricsSpec(boolean useSupervisors) throws Exception
{
+ updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
+
loadData(INDEX_TASK_WITH_DIMENSION_SPEC);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
- intervalsBeforeCompaction.sort(null);
+ intervalsBeforeCompaction.sort(Ordering.natural().reversed());
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
@@ -1664,7 +1679,7 @@ public void testAutoCompationDutyWithMetricsSpec() throws Exception
false,
CompactionEngine.NATIVE
);
- forceTriggerAutoCompaction(2);
+ forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2);
// Result should be the same with the addition of new metrics, "double_sum_added" and "long_sum_added".
// These new metrics should have the same value as the input field "added"
@@ -1686,7 +1701,7 @@ public void testAutoCompationDutyWithMetricsSpec() throws Exception
List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
// Verify compacted segments does not get compacted again
- forceTriggerAutoCompaction(2);
+ forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2);
List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
@@ -1807,6 +1822,12 @@ private void verifyQuery(String queryResource, Map keyValueToRep
queryHelper.testQueriesFromString(queryResponseTemplate);
}
+ private void updateClusterConfig(ClusterCompactionConfig clusterConfig) throws Exception
+ {
+ compactionResource.updateClusterConfig(clusterConfig);
+ LOG.info("Updated cluster config to [%s]", clusterConfig);
+ }
+
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
@@ -1925,15 +1946,8 @@ private void submitCompactionConfig(
Thread.sleep(2000);
// Verify that the compaction config is updated correctly.
- DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig();
DataSourceCompactionConfig foundDataSourceCompactionConfig
- = compactionConfig.findConfigForDatasource(fullDatasourceName).orNull();
- Assert.assertNotNull(foundDataSourceCompactionConfig);
- Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
- Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), partitionsSpec);
- Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), skipOffsetFromLatest);
-
- foundDataSourceCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
+ = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
Assert.assertNotNull(foundDataSourceCompactionConfig);
Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), partitionsSpec);
@@ -1945,15 +1959,56 @@ private void deleteCompactionConfig() throws Exception
compactionResource.deleteDataSourceCompactionConfig(fullDatasourceName);
// Verify that the compaction config is updated correctly.
- DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig();
+ DruidCompactionConfig compactionConfig = DruidCompactionConfig
+ .empty().withDatasourceConfigs(compactionResource.getAllCompactionConfigs());
DataSourceCompactionConfig foundDataSourceCompactionConfig
= compactionConfig.findConfigForDatasource(fullDatasourceName).orNull();
Assert.assertNull(foundDataSourceCompactionConfig);
}
+ /**
+ * Performs compaction of the given intervals of the test datasource,
+ * {@link #fullDatasourceName}, and verifies the total number of segments in
+ * the datasource after compaction.
+ */
+ private void forceTriggerAutoCompaction(
+ List intervals,
+ boolean useSupervisors,
+ int numExpectedSegmentsAfterCompaction
+ ) throws Exception
+ {
+ if (useSupervisors) {
+ // Enable compaction for the requested intervals
+ final FixedIntervalOrderPolicy policy = new FixedIntervalOrderPolicy(
+ intervals.stream().map(
+ interval -> new FixedIntervalOrderPolicy.Candidate(fullDatasourceName, Intervals.of(interval))
+ ).collect(Collectors.toList())
+ );
+ updateClusterConfig(
+ new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null)
+ );
+
+ // Wait for scheduler to pick up the compaction job
+ Thread.sleep(30_000);
+ waitForCompactionToFinish(numExpectedSegmentsAfterCompaction);
+
+ // Disable all compaction
+ updateClusterConfig(
+ new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null)
+ );
+ } else {
+ forceTriggerAutoCompaction(numExpectedSegmentsAfterCompaction);
+ }
+ }
+
private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) throws Exception
{
compactionResource.forceTriggerAutoCompaction();
+ waitForCompactionToFinish(numExpectedSegmentsAfterCompaction);
+ }
+
+ private void waitForCompactionToFinish(int numExpectedSegmentsAfterCompaction)
+ {
waitForAllTasksToCompleteForDataSource(fullDatasourceName);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
@@ -2042,16 +2097,30 @@ private void verifySegmentsCompactedDimensionSchema(List dimens
private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCompactionTaskSlots) throws Exception
{
- compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, maxCompactionTaskSlots);
- // Verify that the compaction config is updated correctly.
- DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig();
- Assert.assertEquals(compactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio);
- Assert.assertEquals(compactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
+ final ClusterCompactionConfig oldConfig = compactionResource.getClusterConfig();
+ compactionResource.updateClusterConfig(
+ new ClusterCompactionConfig(
+ compactionTaskSlotRatio,
+ maxCompactionTaskSlots,
+ oldConfig.getCompactionPolicy(),
+ oldConfig.isUseSupervisors(),
+ oldConfig.getEngine()
+ )
+ );
+
+ // Verify that the compaction config is updated correctly
+ final ClusterCompactionConfig updatedConfig = compactionResource.getClusterConfig();
+ Assert.assertEquals(updatedConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio);
+ Assert.assertEquals(updatedConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
+ LOG.info(
+ "Updated compactionTaskSlotRatio[%s] and maxCompactionTaskSlots[%d]",
+ compactionTaskSlotRatio, maxCompactionTaskSlots
+ );
}
private void getAndAssertCompactionStatus(
String fullDatasourceName,
- AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
+ AutoCompactionSnapshot.ScheduleStatus scheduleStatus,
Matcher bytesAwaitingCompactionMatcher,
Matcher bytesCompactedMatcher,
Matcher bytesSkippedMatcher,
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
index 730d4312aa4a..505e261b2687 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
@@ -57,7 +57,8 @@ public void testUpgradeAutoCompactionConfigurationWhenConfigurationFromOlderVers
{
// Verify that compaction config already exist. This config was inserted manually into the database using SQL script.
// This auto compaction configuration payload is from Druid 0.21.0
- DruidCompactionConfig coordinatorCompactionConfig = compactionResource.getCompactionConfig();
+ DruidCompactionConfig coordinatorCompactionConfig = DruidCompactionConfig.empty()
+ .withDatasourceConfigs(compactionResource.getAllCompactionConfigs());
DataSourceCompactionConfig foundDataSourceCompactionConfig
= coordinatorCompactionConfig.findConfigForDatasource(UPGRADE_DATASOURCE_NAME).orNull();
Assert.assertNotNull(foundDataSourceCompactionConfig);
@@ -100,13 +101,9 @@ public void testUpgradeAutoCompactionConfigurationWhenConfigurationFromOlderVers
.build();
compactionResource.submitCompactionConfig(compactionConfig);
- // Wait for compaction config to persist
- Thread.sleep(2000);
-
// Verify that compaction was successfully updated
- coordinatorCompactionConfig = compactionResource.getCompactionConfig();
foundDataSourceCompactionConfig
- = coordinatorCompactionConfig.findConfigForDatasource(UPGRADE_DATASOURCE_NAME).orNull();
+ = compactionResource.getDataSourceCompactionConfig(UPGRADE_DATASOURCE_NAME);
Assert.assertNotNull(foundDataSourceCompactionConfig);
Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), newPartitionsSpec);
diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index edeb16665ba4..bcd3e4348e52 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -25,9 +25,11 @@
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
+import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
@@ -74,4 +76,15 @@ public interface CoordinatorClient
* Retrieves list of datasources with used segments.
*/
ListenableFuture> fetchDataSourcesWithUsedSegments();
+
+ /**
+ * Gets the latest compaction snapshots of one or all datasources.
+ *
+ * API: {@code GET /druid/coordinator/v1/compaction/status}
+ *
+ * @param dataSource If passed as non-null, then the returned list contains only
+ * the snapshot for this datasource.
+ */
+ ListenableFuture getCompactionSnapshots(@Nullable String dataSource);
+
}
diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
index 224a766c719d..50cd58e0eb33 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
@@ -36,11 +36,13 @@
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
+import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.coordination.LoadableDataSegment;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -199,4 +201,25 @@ public ListenableFuture> fetchDataSourcesWithUsedSegments()
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<>() {})
);
}
+
+ @Override
+ public ListenableFuture getCompactionSnapshots(@Nullable String dataSource)
+ {
+ final StringBuilder pathBuilder = new StringBuilder("/druid/coordinator/v1/compaction/status");
+ if (dataSource != null && !dataSource.isEmpty()) {
+ pathBuilder.append("?").append("dataSource=").append(StringUtils.urlEncode(dataSource));
+ }
+
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.GET, pathBuilder.toString()),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(
+ jsonMapper,
+ holder.getContent(),
+ CompactionStatusResponse.class
+ )
+ );
+ }
}
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
index 7ccdcedb2bd3..a1782d33112b 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
@@ -31,8 +31,6 @@
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
-import org.apache.druid.server.compaction.CompactionProgressResponse;
-import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.http.SegmentsToUpdateFilter;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@@ -133,12 +131,6 @@ public ListenableFuture getTotalWorkerCapacity(
throw new UnsupportedOperationException();
}
- @Override
- public ListenableFuture getCompactionSnapshots(@Nullable String dataSource)
- {
- throw new UnsupportedOperationException();
- }
-
@Override
public ListenableFuture markNonOvershadowedSegmentsAsUsed(String dataSource)
{
@@ -178,12 +170,6 @@ public ListenableFuture markSegmentAsUnused(SegmentId seg
throw new UnsupportedOperationException();
}
- @Override
- public ListenableFuture getBytesAwaitingCompaction(String dataSource)
- {
- throw new UnsupportedOperationException();
- }
-
@Override
public ListenableFuture isCompactionSupervisorEnabled()
{
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index b14c287e734f..57899844d8b3 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -35,8 +35,6 @@
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
-import org.apache.druid.server.compaction.CompactionProgressResponse;
-import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.http.SegmentsToUpdateFilter;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
@@ -224,23 +222,6 @@ ListenableFuture