diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java index 9313ba1ff15b..5d435549bacd 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java @@ -27,6 +27,7 @@ import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager; +import org.apache.druid.common.config.Configs; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.error.DruidException; import org.apache.druid.error.InternalServerError; @@ -152,6 +153,28 @@ public DruidCompactionConfig convertBytesToCompactionConfig(byte[] bytes) ); } + public boolean updateCompactionTaskSlots( + @Nullable Double compactionTaskSlotRatio, + @Nullable Integer maxCompactionTaskSlots, + AuditInfo auditInfo + ) + { + UnaryOperator operator = current -> { + final ClusterCompactionConfig currentClusterConfig = current.clusterConfig(); + final ClusterCompactionConfig updatedClusterConfig = new ClusterCompactionConfig( + Configs.valueOrDefault(compactionTaskSlotRatio, currentClusterConfig.getCompactionTaskSlotRatio()), + Configs.valueOrDefault(maxCompactionTaskSlots, currentClusterConfig.getMaxCompactionTaskSlots()), + currentClusterConfig.getCompactionPolicy(), + currentClusterConfig.isUseSupervisors(), + currentClusterConfig.getEngine() + ); + + return current.withClusterConfig(updatedClusterConfig); + }; + + return updateConfigHelper(operator, auditInfo); + } + public boolean updateClusterCompactionConfig( ClusterCompactionConfig config, AuditInfo auditInfo diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index f24aae7baa58..561670ae628b 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -22,10 +22,8 @@ import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.audit.AuditInfo; -import org.apache.druid.common.config.Configs; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.CompactionEngine; -import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.http.security.ConfigResourceFilter; @@ -85,18 +83,9 @@ public Response setCompactionTaskLimit( return ServletResourceUtils.buildUpdateResponse(() -> true); } - final ClusterCompactionConfig currentConfig = configManager.getCurrentCompactionConfig().clusterConfig(); - final ClusterCompactionConfig updatedClusterConfig = new ClusterCompactionConfig( - Configs.valueOrDefault(compactionTaskSlotRatio, currentConfig.getCompactionTaskSlotRatio()), - Configs.valueOrDefault(maxCompactionTaskSlots, currentConfig.getMaxCompactionTaskSlots()), - currentConfig.getCompactionPolicy(), - currentConfig.isUseSupervisors(), - currentConfig.getEngine() - ); - final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); return ServletResourceUtils.buildUpdateResponse( - () -> configManager.updateClusterCompactionConfig(updatedClusterConfig, auditInfo) + () -> configManager.updateCompactionTaskSlots(compactionTaskSlotRatio, maxCompactionTaskSlots, auditInfo) ); }