From 95809cf5c29e654ceb760dae00f5d470db0a1236 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Sat, 15 Jun 2024 14:41:37 +0530 Subject: [PATCH] KAFKA-15265: Reapply dynamic remote configs after broker restart The below remote log configs can be configured dynamically: 1. remote.log.manager.copy.max.bytes.per.second 2. remote.log.manager.fetch.max.bytes.per.second and 3. remote.log.index.file.cache.total.size.bytes If those values are dynamically configured, then during the broker restart, it load the static value from the config file instead of the dynamic values. --- .../kafka/log/remote/RemoteLogManager.java | 37 ++++--- .../scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 6 ++ .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../log/remote/RemoteLogManagerTest.java | 50 +++++---- .../server/DynamicBrokerConfigTest.scala | 101 ++++++++++-------- .../kafka/server/ReplicaManagerTest.scala | 12 +-- .../storage/RemoteLogManagerConfig.java | 12 --- 8 files changed, 126 insertions(+), 96 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 2c64fa829fed3..7095f155544d5 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -24,6 +24,7 @@ import kafka.log.remote.quota.RLMQuotaManager; import kafka.log.remote.quota.RLMQuotaManagerConfig; import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; import kafka.server.QuotaType; import kafka.server.StopPartition; import org.apache.kafka.common.KafkaException; @@ -151,7 +152,7 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; - private final RemoteLogManagerConfig rlmConfig; + private final KafkaConfig config; private final int brokerId; private final String logDir; private final Time time; @@ -192,7 +193,7 @@ public class RemoteLogManager implements Closeable { /** * Creates RemoteLogManager instance with the given arguments. * - * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level. + * @param config Configuration required for remote logging subsystem(tiered storage) at the broker level. * @param brokerId id of the current broker. * @param logDir directory of Kafka log segments. * @param time Time instance. @@ -202,7 +203,7 @@ public class RemoteLogManager implements Closeable { * @param brokerTopicStats BrokerTopicStats instance to update the respective metrics. * @param metrics Metrics instance */ - public RemoteLogManager(RemoteLogManagerConfig rlmConfig, + public RemoteLogManager(KafkaConfig config, int brokerId, String logDir, String clusterId, @@ -211,7 +212,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, BiConsumer updateRemoteLogStartOffset, BrokerTopicStats brokerTopicStats, Metrics metrics) throws IOException { - this.rlmConfig = rlmConfig; + this.config = config; this.brokerId = brokerId; this.logDir = logDir; this.clusterId = clusterId; @@ -226,7 +227,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, rlmCopyQuotaManager = createRLMCopyQuotaManager(); rlmFetchQuotaManager = createRLMFetchQuotaManager(); - indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); + indexCache = new RemoteIndexCache(config.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize()); @@ -274,12 +276,12 @@ Duration quotaTimeout() { } RLMQuotaManager createRLMCopyQuotaManager() { - return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$, + return new RLMQuotaManager(copyQuotaManagerConfig(config), metrics, QuotaType.RLMCopy$.MODULE$, "Tracking copy byte-rate for Remote Log Manager", time); } RLMQuotaManager createRLMFetchQuotaManager() { - return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMFetch$.MODULE$, + return new RLMQuotaManager(fetchQuotaManagerConfig(config), metrics, QuotaType.RLMFetch$.MODULE$, "Tracking fetch byte-rate for Remote Log Manager", time); } @@ -287,14 +289,16 @@ public boolean isRemoteLogFetchQuotaExceeded() { return rlmFetchQuotaManager.isQuotaExceeded(); } - static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { - return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(), + static RLMQuotaManagerConfig copyQuotaManagerConfig(KafkaConfig config) { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); + return new RLMQuotaManagerConfig(config.remoteLogManagerCopyMaxBytesPerSecond(), rlmConfig.remoteLogManagerCopyNumQuotaSamples(), rlmConfig.remoteLogManagerCopyQuotaWindowSizeSeconds()); } - static RLMQuotaManagerConfig fetchQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { - return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerFetchMaxBytesPerSecond(), + static RLMQuotaManagerConfig fetchQuotaManagerConfig(KafkaConfig config) { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); + return new RLMQuotaManagerConfig(config.remoteLogManagerFetchMaxBytesPerSecond(), rlmConfig.remoteLogManagerFetchNumQuotaSamples(), rlmConfig.remoteLogManagerFetchQuotaWindowSizeSeconds()); } @@ -311,6 +315,7 @@ private T createDelegate(ClassLoader classLoader, String className) { @SuppressWarnings("removal") RemoteStorageManager createRemoteStorageManager() { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); return java.security.AccessController.doPrivileged(new PrivilegedAction() { private final String classPath = rlmConfig.remoteStorageManagerClassPath(); @@ -327,13 +332,14 @@ public RemoteStorageManager run() { } private void configureRSM() { - final Map rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps()); + final Map rsmProps = new HashMap<>(config.remoteLogManagerConfig().remoteStorageManagerProps()); rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId); remoteLogStorageManager.configure(rsmProps); } @SuppressWarnings("removal") RemoteLogMetadataManager createRemoteLogMetadataManager() { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); return java.security.AccessController.doPrivileged(new PrivilegedAction() { private final String classPath = rlmConfig.remoteLogMetadataManagerClassPath(); @@ -360,7 +366,7 @@ private void configureRLMM() { rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name); }); // update the remoteLogMetadataProps here to override endpoint config if any - rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps()); + rlmmProps.putAll(config.remoteLogManagerConfig().remoteLogMetadataManagerProps()); rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId); rlmmProps.put(LOG_DIR_CONFIG, logDir); @@ -412,7 +418,7 @@ public void onLeadershipChange(Set partitionsBecomeLeader, Map topicIds) { LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); - if (this.rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { + if (config.remoteLogManagerConfig().isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); } @@ -1742,9 +1748,10 @@ public Future asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer convertToLeaderOrFollower) { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition, topicIdPartition -> { - RLMTask task = new RLMTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); + RLMTask task = new RLMTask(topicIdPartition, rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); // set this upfront when it is getting initialized instead of doing it after scheduling. convertToLeaderOrFollower.accept(task); LOGGER.info("Created a new task: {} and getting scheduled", task); diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 7e225440abf61..a03128ab9f52b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -621,7 +621,7 @@ class BrokerServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { - Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, + Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index caae2217c8dee..f2cddda700fca 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -871,6 +871,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) + def remoteLogIndexFileCacheTotalSizeBytes: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + + def remoteLogManagerCopyMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP) + + def remoteLogManagerFetchMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP) + validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index dffd2e7f697fb..3154ab1c344eb 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -691,7 +691,7 @@ class KafkaServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { - Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, + Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index f18658088b6f8..eeac6bdbae6b4 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -193,7 +193,7 @@ public class RemoteLogManagerTest { private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); - private RemoteLogManagerConfig remoteLogManagerConfig = null; + private KafkaConfig config; private BrokerTopicStats brokerTopicStats = null; private final Metrics metrics = new Metrics(time); @@ -223,10 +223,11 @@ void setUp() throws Exception { Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); - remoteLogManagerConfig = createRLMConfig(props); + createRLMConfig(props); + config = KafkaConfig.fromProps(props); brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); - remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { @@ -378,10 +379,13 @@ void testRemoteLogMetadataManagerWithEndpointConfig() { @Test void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException { Properties props = new Properties(); + props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); // override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix" props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL"); + createRLMConfig(props); + KafkaConfig config = KafkaConfig.fromProps(props); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - createRLMConfig(props), + config, brokerId, logDir, clusterId, @@ -1271,7 +1275,7 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData, void testGetClassLoaderAwareRemoteStorageManager() throws Exception { ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); try (RemoteLogManager remoteLogManager = - new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + new RemoteLogManager(config, brokerId, logDir, clusterId, time, t -> Optional.empty(), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1533,7 +1537,7 @@ void testIdempotentClose() throws IOException { public void testRemoveMetricsOnClose() throws IOException { MockedConstruction mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class); try { - RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, + RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; @@ -1928,7 +1932,7 @@ public void testFindLogStartOffset() throws RemoteStorageException, IOException else return Collections.emptyIterator(); }); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1953,7 +1957,7 @@ public void testFindLogStartOffsetFallbackToLocalLogStartOffsetWhenRemoteIsEmpty when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) .thenReturn(Collections.emptyIterator()); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1987,7 +1991,7 @@ public void testLogStartOffsetUpdatedOnStartup() throws RemoteStorageException, }); AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2419,7 +2423,7 @@ private void verifyDeleteLogSegment(List segmentMetada @Test public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException { AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2561,7 +2565,7 @@ public void testReadForMissingFirstBatchInRemote() throws RemoteStorageException ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, brokerId, logDir, clusterId, @@ -2634,7 +2638,7 @@ public void testReadForFirstBatchMoreThanMaxFetchBytes(boolean minOneMessage) th ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, brokerId, logDir, clusterId, @@ -2719,7 +2723,7 @@ public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException try (RemoteLogManager remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, brokerId, logDir, clusterId, @@ -2764,18 +2768,23 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l @Test public void testCopyQuotaManagerConfig() { Properties defaultProps = new Properties(); - RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); + defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); + createRLMConfig(defaultProps); + KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); + customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); - RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); + createRLMConfig(customProps); + KafkaConfig config = KafkaConfig.fromProps(customProps); + + RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config); assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); @@ -2784,17 +2793,22 @@ public void testCopyQuotaManagerConfig() { @Test public void testFetchQuotaManagerConfig() { Properties defaultProps = new Properties(); - RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); + defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); + createRLMConfig(defaultProps); + KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); + RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); + customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); + createRLMConfig(customProps); + KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps); RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 3ba158891574e..8dad66919bc06 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -47,7 +47,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers.anyString import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.{mock, verify, verifyNoMoreInteractions, when} import scala.annotation.nowarn import scala.jdk.CollectionConverters._ @@ -852,49 +852,64 @@ class DynamicBrokerConfigTest { @Test def testRemoteLogManagerCopyQuotaUpdates(): Unit = { - testRemoteLogManagerQuotaUpdates( - RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, - RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, - (remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateCopyQuota(quota) - ) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) + val config = KafkaConfig.fromProps(props) + val serverMock: KafkaServer = mock(classOf[KafkaServer]) + val remoteLogManager = mock(classOf[RemoteLogManager]) + + Mockito.when(serverMock.config).thenReturn(config) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) + + config.dynamicConfig.initialize(None, None) + config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) + + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, + config.remoteLogManagerCopyMaxBytesPerSecond) + + // Update default config + props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "100") + config.dynamicConfig.updateDefaultConfig(props) + assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond) + verify(remoteLogManager).updateCopyQuota(100) + + // Update per broker config + props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "200") + config.dynamicConfig.updateBrokerConfig(0, props) + assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond) + verify(remoteLogManager).updateCopyQuota(200) + + verifyNoMoreInteractions(remoteLogManager) } @Test def testRemoteLogManagerFetchQuotaUpdates(): Unit = { - testRemoteLogManagerQuotaUpdates( - RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, - RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, - (remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateFetchQuota(quota) - ) - } - - def testRemoteLogManagerQuotaUpdates(quotaProp: String, defaultQuota: Long, verifyMethod: (RemoteLogManager, Long) => Unit): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val config = KafkaConfig.fromProps(props) val serverMock: KafkaServer = mock(classOf[KafkaServer]) - val remoteLogManagerMockOpt = Option(mock(classOf[RemoteLogManager])) + val remoteLogManager = mock(classOf[RemoteLogManager]) Mockito.when(serverMock.config).thenReturn(config) - Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) - assertEquals(defaultQuota, config.getLong(quotaProp)) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, + config.remoteLogManagerFetchMaxBytesPerSecond) // Update default config - props.put(quotaProp, "100") + props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "100") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(100, config.getLong(quotaProp)) - verifyMethod(remoteLogManagerMockOpt.get, 100) + assertEquals(100, config.remoteLogManagerFetchMaxBytesPerSecond) + verify(remoteLogManager).updateFetchQuota(100) // Update per broker config - props.put(quotaProp, "200") + props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "200") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(200, config.getLong(quotaProp)) - verifyMethod(remoteLogManagerMockOpt.get, 200) + assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond) + verify(remoteLogManager).updateFetchQuota(200) - Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) + verifyNoMoreInteractions(remoteLogManager) } @Test @@ -906,44 +921,44 @@ class DynamicBrokerConfigTest { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val config = KafkaConfig.fromProps(props) val serverMock: KafkaServer = mock(classOf[KafkaServer]) - val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager])) + val remoteLogManager = Mockito.mock(classOf[RemoteLogManager]) Mockito.when(serverMock.config).thenReturn(config) - Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Default values - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.getLong(indexFileCacheSizeProp)) - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.getLong(copyQuotaProp)) - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.getLong(fetchQuotaProp)) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.remoteLogIndexFileCacheTotalSizeBytes) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.remoteLogManagerFetchMaxBytesPerSecond) // Update default config props.put(indexFileCacheSizeProp, "4") props.put(copyQuotaProp, "100") props.put(fetchQuotaProp, "200") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(4, config.getLong(indexFileCacheSizeProp)) - assertEquals(100, config.getLong(copyQuotaProp)) - assertEquals(200, config.getLong(fetchQuotaProp)) - Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4) - Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100) - Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(200) + assertEquals(4, config.remoteLogIndexFileCacheTotalSizeBytes) + assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond) + verify(remoteLogManager).resizeCacheSize(4) + verify(remoteLogManager).updateCopyQuota(100) + verify(remoteLogManager).updateFetchQuota(200) // Update per broker config props.put(indexFileCacheSizeProp, "8") props.put(copyQuotaProp, "200") props.put(fetchQuotaProp, "400") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(8, config.getLong(indexFileCacheSizeProp)) - assertEquals(200, config.getLong(copyQuotaProp)) - assertEquals(400, config.getLong(fetchQuotaProp)) - Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(8) - Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200) - Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(400) - - Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) + assertEquals(8, config.remoteLogIndexFileCacheTotalSizeBytes) + assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(400, config.remoteLogManagerFetchMaxBytesPerSecond) + verify(remoteLogManager).resizeCacheSize(8) + verify(remoteLogManager).updateCopyQuota(200) + verify(remoteLogManager).updateFetchQuota(400) + + verifyNoMoreInteractions(remoteLogManager) } def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index e44a6efa5e19e..53b342e10db8a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4093,11 +4093,11 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) // set log reader threads number to 2 props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString) - val remoteLogManagerConfig = new RemoteLogManagerConfig(props) + val config = KafkaConfig.fromProps(props) val mockLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", @@ -4199,11 +4199,11 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) - val remoteLogManagerConfig = new RemoteLogManagerConfig(props) + val config = KafkaConfig.fromProps(props) val dummyLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 25f1334226ac8..480983edec42b 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -378,10 +378,6 @@ public String remoteLogMetadataManagerClassPath() { return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); } - public long remoteLogIndexFileCacheTotalSizeBytes() { - return getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); - } - public int remoteLogManagerThreadPoolSize() { return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); } @@ -447,10 +443,6 @@ public Map getConfigProps(String configPrefixProp) { return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(originalsWithPrefix(prefixProp)); } - public long remoteLogManagerCopyMaxBytesPerSecond() { - return getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); - } - public int remoteLogManagerCopyNumQuotaSamples() { return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); } @@ -459,10 +451,6 @@ public int remoteLogManagerCopyQuotaWindowSizeSeconds() { return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); } - public long remoteLogManagerFetchMaxBytesPerSecond() { - return getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); - } - public int remoteLogManagerFetchNumQuotaSamples() { return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); }