diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 2c64fa829fed3..7095f155544d5 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -24,6 +24,7 @@ import kafka.log.remote.quota.RLMQuotaManager; import kafka.log.remote.quota.RLMQuotaManagerConfig; import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; import kafka.server.QuotaType; import kafka.server.StopPartition; import org.apache.kafka.common.KafkaException; @@ -151,7 +152,7 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; - private final RemoteLogManagerConfig rlmConfig; + private final KafkaConfig config; private final int brokerId; private final String logDir; private final Time time; @@ -192,7 +193,7 @@ public class RemoteLogManager implements Closeable { /** * Creates RemoteLogManager instance with the given arguments. * - * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level. + * @param config Configuration required for remote logging subsystem(tiered storage) at the broker level. * @param brokerId id of the current broker. * @param logDir directory of Kafka log segments. * @param time Time instance. @@ -202,7 +203,7 @@ public class RemoteLogManager implements Closeable { * @param brokerTopicStats BrokerTopicStats instance to update the respective metrics. * @param metrics Metrics instance */ - public RemoteLogManager(RemoteLogManagerConfig rlmConfig, + public RemoteLogManager(KafkaConfig config, int brokerId, String logDir, String clusterId, @@ -211,7 +212,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, BiConsumer updateRemoteLogStartOffset, BrokerTopicStats brokerTopicStats, Metrics metrics) throws IOException { - this.rlmConfig = rlmConfig; + this.config = config; this.brokerId = brokerId; this.logDir = logDir; this.clusterId = clusterId; @@ -226,7 +227,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, rlmCopyQuotaManager = createRLMCopyQuotaManager(); rlmFetchQuotaManager = createRLMFetchQuotaManager(); - indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); + indexCache = new RemoteIndexCache(config.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize()); @@ -274,12 +276,12 @@ Duration quotaTimeout() { } RLMQuotaManager createRLMCopyQuotaManager() { - return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$, + return new RLMQuotaManager(copyQuotaManagerConfig(config), metrics, QuotaType.RLMCopy$.MODULE$, "Tracking copy byte-rate for Remote Log Manager", time); } RLMQuotaManager createRLMFetchQuotaManager() { - return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMFetch$.MODULE$, + return new RLMQuotaManager(fetchQuotaManagerConfig(config), metrics, QuotaType.RLMFetch$.MODULE$, "Tracking fetch byte-rate for Remote Log Manager", time); } @@ -287,14 +289,16 @@ public boolean isRemoteLogFetchQuotaExceeded() { return rlmFetchQuotaManager.isQuotaExceeded(); } - static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { - return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(), + static RLMQuotaManagerConfig copyQuotaManagerConfig(KafkaConfig config) { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); + return new RLMQuotaManagerConfig(config.remoteLogManagerCopyMaxBytesPerSecond(), rlmConfig.remoteLogManagerCopyNumQuotaSamples(), rlmConfig.remoteLogManagerCopyQuotaWindowSizeSeconds()); } - static RLMQuotaManagerConfig fetchQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { - return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerFetchMaxBytesPerSecond(), + static RLMQuotaManagerConfig fetchQuotaManagerConfig(KafkaConfig config) { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); + return new RLMQuotaManagerConfig(config.remoteLogManagerFetchMaxBytesPerSecond(), rlmConfig.remoteLogManagerFetchNumQuotaSamples(), rlmConfig.remoteLogManagerFetchQuotaWindowSizeSeconds()); } @@ -311,6 +315,7 @@ private T createDelegate(ClassLoader classLoader, String className) { @SuppressWarnings("removal") RemoteStorageManager createRemoteStorageManager() { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); return java.security.AccessController.doPrivileged(new PrivilegedAction() { private final String classPath = rlmConfig.remoteStorageManagerClassPath(); @@ -327,13 +332,14 @@ public RemoteStorageManager run() { } private void configureRSM() { - final Map rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps()); + final Map rsmProps = new HashMap<>(config.remoteLogManagerConfig().remoteStorageManagerProps()); rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId); remoteLogStorageManager.configure(rsmProps); } @SuppressWarnings("removal") RemoteLogMetadataManager createRemoteLogMetadataManager() { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); return java.security.AccessController.doPrivileged(new PrivilegedAction() { private final String classPath = rlmConfig.remoteLogMetadataManagerClassPath(); @@ -360,7 +366,7 @@ private void configureRLMM() { rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name); }); // update the remoteLogMetadataProps here to override endpoint config if any - rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps()); + rlmmProps.putAll(config.remoteLogManagerConfig().remoteLogMetadataManagerProps()); rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId); rlmmProps.put(LOG_DIR_CONFIG, logDir); @@ -412,7 +418,7 @@ public void onLeadershipChange(Set partitionsBecomeLeader, Map topicIds) { LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); - if (this.rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { + if (config.remoteLogManagerConfig().isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); } @@ -1742,9 +1748,10 @@ public Future asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer convertToLeaderOrFollower) { + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition, topicIdPartition -> { - RLMTask task = new RLMTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); + RLMTask task = new RLMTask(topicIdPartition, rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); // set this upfront when it is getting initialized instead of doing it after scheduling. convertToLeaderOrFollower.accept(task); LOGGER.info("Created a new task: {} and getting scheduled", task); diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 7e225440abf61..a03128ab9f52b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -621,7 +621,7 @@ class BrokerServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { - Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, + Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index caae2217c8dee..f2cddda700fca 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -871,6 +871,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) + def remoteLogIndexFileCacheTotalSizeBytes: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + + def remoteLogManagerCopyMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP) + + def remoteLogManagerFetchMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP) + validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index dffd2e7f697fb..3154ab1c344eb 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -691,7 +691,7 @@ class KafkaServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { - Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, + Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index f18658088b6f8..eeac6bdbae6b4 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -193,7 +193,7 @@ public class RemoteLogManagerTest { private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); - private RemoteLogManagerConfig remoteLogManagerConfig = null; + private KafkaConfig config; private BrokerTopicStats brokerTopicStats = null; private final Metrics metrics = new Metrics(time); @@ -223,10 +223,11 @@ void setUp() throws Exception { Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); - remoteLogManagerConfig = createRLMConfig(props); + createRLMConfig(props); + config = KafkaConfig.fromProps(props); brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); - remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { @@ -378,10 +379,13 @@ void testRemoteLogMetadataManagerWithEndpointConfig() { @Test void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException { Properties props = new Properties(); + props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); // override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix" props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL"); + createRLMConfig(props); + KafkaConfig config = KafkaConfig.fromProps(props); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - createRLMConfig(props), + config, brokerId, logDir, clusterId, @@ -1271,7 +1275,7 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData, void testGetClassLoaderAwareRemoteStorageManager() throws Exception { ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); try (RemoteLogManager remoteLogManager = - new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + new RemoteLogManager(config, brokerId, logDir, clusterId, time, t -> Optional.empty(), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1533,7 +1537,7 @@ void testIdempotentClose() throws IOException { public void testRemoveMetricsOnClose() throws IOException { MockedConstruction mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class); try { - RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, + RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; @@ -1928,7 +1932,7 @@ public void testFindLogStartOffset() throws RemoteStorageException, IOException else return Collections.emptyIterator(); }); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1953,7 +1957,7 @@ public void testFindLogStartOffsetFallbackToLocalLogStartOffsetWhenRemoteIsEmpty when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) .thenReturn(Collections.emptyIterator()); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1987,7 +1991,7 @@ public void testLogStartOffsetUpdatedOnStartup() throws RemoteStorageException, }); AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2419,7 +2423,7 @@ private void verifyDeleteLogSegment(List segmentMetada @Test public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException { AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2561,7 +2565,7 @@ public void testReadForMissingFirstBatchInRemote() throws RemoteStorageException ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, brokerId, logDir, clusterId, @@ -2634,7 +2638,7 @@ public void testReadForFirstBatchMoreThanMaxFetchBytes(boolean minOneMessage) th ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, brokerId, logDir, clusterId, @@ -2719,7 +2723,7 @@ public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException try (RemoteLogManager remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, brokerId, logDir, clusterId, @@ -2764,18 +2768,23 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l @Test public void testCopyQuotaManagerConfig() { Properties defaultProps = new Properties(); - RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); + defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); + createRLMConfig(defaultProps); + KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); + customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); - RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); + createRLMConfig(customProps); + KafkaConfig config = KafkaConfig.fromProps(customProps); + + RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config); assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); @@ -2784,17 +2793,22 @@ public void testCopyQuotaManagerConfig() { @Test public void testFetchQuotaManagerConfig() { Properties defaultProps = new Properties(); - RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); + defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); + createRLMConfig(defaultProps); + KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); + RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); + customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); + createRLMConfig(customProps); + KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps); RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 3ba158891574e..8dad66919bc06 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -47,7 +47,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers.anyString import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.{mock, verify, verifyNoMoreInteractions, when} import scala.annotation.nowarn import scala.jdk.CollectionConverters._ @@ -852,49 +852,64 @@ class DynamicBrokerConfigTest { @Test def testRemoteLogManagerCopyQuotaUpdates(): Unit = { - testRemoteLogManagerQuotaUpdates( - RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, - RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, - (remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateCopyQuota(quota) - ) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) + val config = KafkaConfig.fromProps(props) + val serverMock: KafkaServer = mock(classOf[KafkaServer]) + val remoteLogManager = mock(classOf[RemoteLogManager]) + + Mockito.when(serverMock.config).thenReturn(config) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) + + config.dynamicConfig.initialize(None, None) + config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) + + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, + config.remoteLogManagerCopyMaxBytesPerSecond) + + // Update default config + props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "100") + config.dynamicConfig.updateDefaultConfig(props) + assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond) + verify(remoteLogManager).updateCopyQuota(100) + + // Update per broker config + props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "200") + config.dynamicConfig.updateBrokerConfig(0, props) + assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond) + verify(remoteLogManager).updateCopyQuota(200) + + verifyNoMoreInteractions(remoteLogManager) } @Test def testRemoteLogManagerFetchQuotaUpdates(): Unit = { - testRemoteLogManagerQuotaUpdates( - RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, - RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, - (remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateFetchQuota(quota) - ) - } - - def testRemoteLogManagerQuotaUpdates(quotaProp: String, defaultQuota: Long, verifyMethod: (RemoteLogManager, Long) => Unit): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val config = KafkaConfig.fromProps(props) val serverMock: KafkaServer = mock(classOf[KafkaServer]) - val remoteLogManagerMockOpt = Option(mock(classOf[RemoteLogManager])) + val remoteLogManager = mock(classOf[RemoteLogManager]) Mockito.when(serverMock.config).thenReturn(config) - Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) - assertEquals(defaultQuota, config.getLong(quotaProp)) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, + config.remoteLogManagerFetchMaxBytesPerSecond) // Update default config - props.put(quotaProp, "100") + props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "100") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(100, config.getLong(quotaProp)) - verifyMethod(remoteLogManagerMockOpt.get, 100) + assertEquals(100, config.remoteLogManagerFetchMaxBytesPerSecond) + verify(remoteLogManager).updateFetchQuota(100) // Update per broker config - props.put(quotaProp, "200") + props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "200") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(200, config.getLong(quotaProp)) - verifyMethod(remoteLogManagerMockOpt.get, 200) + assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond) + verify(remoteLogManager).updateFetchQuota(200) - Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) + verifyNoMoreInteractions(remoteLogManager) } @Test @@ -906,44 +921,44 @@ class DynamicBrokerConfigTest { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val config = KafkaConfig.fromProps(props) val serverMock: KafkaServer = mock(classOf[KafkaServer]) - val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager])) + val remoteLogManager = Mockito.mock(classOf[RemoteLogManager]) Mockito.when(serverMock.config).thenReturn(config) - Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Default values - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.getLong(indexFileCacheSizeProp)) - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.getLong(copyQuotaProp)) - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.getLong(fetchQuotaProp)) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.remoteLogIndexFileCacheTotalSizeBytes) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.remoteLogManagerFetchMaxBytesPerSecond) // Update default config props.put(indexFileCacheSizeProp, "4") props.put(copyQuotaProp, "100") props.put(fetchQuotaProp, "200") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(4, config.getLong(indexFileCacheSizeProp)) - assertEquals(100, config.getLong(copyQuotaProp)) - assertEquals(200, config.getLong(fetchQuotaProp)) - Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4) - Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100) - Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(200) + assertEquals(4, config.remoteLogIndexFileCacheTotalSizeBytes) + assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond) + verify(remoteLogManager).resizeCacheSize(4) + verify(remoteLogManager).updateCopyQuota(100) + verify(remoteLogManager).updateFetchQuota(200) // Update per broker config props.put(indexFileCacheSizeProp, "8") props.put(copyQuotaProp, "200") props.put(fetchQuotaProp, "400") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(8, config.getLong(indexFileCacheSizeProp)) - assertEquals(200, config.getLong(copyQuotaProp)) - assertEquals(400, config.getLong(fetchQuotaProp)) - Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(8) - Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200) - Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(400) - - Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) + assertEquals(8, config.remoteLogIndexFileCacheTotalSizeBytes) + assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(400, config.remoteLogManagerFetchMaxBytesPerSecond) + verify(remoteLogManager).resizeCacheSize(8) + verify(remoteLogManager).updateCopyQuota(200) + verify(remoteLogManager).updateFetchQuota(400) + + verifyNoMoreInteractions(remoteLogManager) } def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index e44a6efa5e19e..53b342e10db8a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4093,11 +4093,11 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) // set log reader threads number to 2 props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString) - val remoteLogManagerConfig = new RemoteLogManagerConfig(props) + val config = KafkaConfig.fromProps(props) val mockLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", @@ -4199,11 +4199,11 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) - val remoteLogManagerConfig = new RemoteLogManagerConfig(props) + val config = KafkaConfig.fromProps(props) val dummyLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 25f1334226ac8..480983edec42b 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -378,10 +378,6 @@ public String remoteLogMetadataManagerClassPath() { return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); } - public long remoteLogIndexFileCacheTotalSizeBytes() { - return getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); - } - public int remoteLogManagerThreadPoolSize() { return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); } @@ -447,10 +443,6 @@ public Map getConfigProps(String configPrefixProp) { return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(originalsWithPrefix(prefixProp)); } - public long remoteLogManagerCopyMaxBytesPerSecond() { - return getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); - } - public int remoteLogManagerCopyNumQuotaSamples() { return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); } @@ -459,10 +451,6 @@ public int remoteLogManagerCopyQuotaWindowSizeSeconds() { return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); } - public long remoteLogManagerFetchMaxBytesPerSecond() { - return getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); - } - public int remoteLogManagerFetchNumQuotaSamples() { return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); }