From 08e36afe2c90fc684a5f1d28ec2b7dfe9b55390a Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Wed, 19 Jun 2024 12:13:57 +0530 Subject: [PATCH 1/5] KAFKA-16976: Update the current/dynamic config in RemoteLogManagerConfig. --- .../kafka/log/remote/RemoteLogManager.java | 35 ++++++------- .../scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 9 +--- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../log/remote/RemoteLogManagerTest.java | 50 +++++++------------ .../server/DynamicBrokerConfigTest.scala | 49 +++++++++--------- .../kafka/server/ReplicaManagerTest.scala | 12 ++--- .../storage/RemoteLogManagerConfig.java | 46 +++++++++++++++++ 9 files changed, 114 insertions(+), 93 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 11aba6d64a53a..9f2c920e557a5 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -24,7 +24,6 @@ import kafka.log.remote.quota.RLMQuotaManager; import kafka.log.remote.quota.RLMQuotaManagerConfig; import kafka.server.BrokerTopicStats; -import kafka.server.KafkaConfig; import kafka.server.QuotaType; import kafka.server.StopPartition; import org.apache.kafka.common.KafkaException; @@ -152,7 +151,7 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; - private final KafkaConfig config; + private final RemoteLogManagerConfig rlmConfig; private final int brokerId; private final String logDir; private final Time time; @@ -193,7 +192,7 @@ public class RemoteLogManager implements Closeable { /** * Creates RemoteLogManager instance with the given arguments. * - * @param config Configuration required for remote logging subsystem(tiered storage) at the broker level. + * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level. * @param brokerId id of the current broker. * @param logDir directory of Kafka log segments. * @param time Time instance. @@ -203,7 +202,7 @@ public class RemoteLogManager implements Closeable { * @param brokerTopicStats BrokerTopicStats instance to update the respective metrics. * @param metrics Metrics instance */ - public RemoteLogManager(KafkaConfig config, + public RemoteLogManager(RemoteLogManagerConfig rlmConfig, int brokerId, String logDir, String clusterId, @@ -212,7 +211,7 @@ public RemoteLogManager(KafkaConfig config, BiConsumer updateRemoteLogStartOffset, BrokerTopicStats brokerTopicStats, Metrics metrics) throws IOException { - this.config = config; + this.rlmConfig = rlmConfig; this.brokerId = brokerId; this.logDir = logDir; this.clusterId = clusterId; @@ -227,8 +226,7 @@ public RemoteLogManager(KafkaConfig config, rlmCopyQuotaManager = createRLMCopyQuotaManager(); rlmFetchQuotaManager = createRLMFetchQuotaManager(); - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); - indexCache = new RemoteIndexCache(config.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); + indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize()); @@ -276,12 +274,12 @@ Duration quotaTimeout() { } RLMQuotaManager createRLMCopyQuotaManager() { - return new RLMQuotaManager(copyQuotaManagerConfig(config), metrics, QuotaType.RLMCopy$.MODULE$, + return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$, "Tracking copy byte-rate for Remote Log Manager", time); } RLMQuotaManager createRLMFetchQuotaManager() { - return new RLMQuotaManager(fetchQuotaManagerConfig(config), metrics, QuotaType.RLMFetch$.MODULE$, + return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMFetch$.MODULE$, "Tracking fetch byte-rate for Remote Log Manager", time); } @@ -289,16 +287,14 @@ public boolean isRemoteLogFetchQuotaExceeded() { return rlmFetchQuotaManager.isQuotaExceeded(); } - static RLMQuotaManagerConfig copyQuotaManagerConfig(KafkaConfig config) { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); - return new RLMQuotaManagerConfig(config.remoteLogManagerCopyMaxBytesPerSecond(), + static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { + return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(), rlmConfig.remoteLogManagerCopyNumQuotaSamples(), rlmConfig.remoteLogManagerCopyQuotaWindowSizeSeconds()); } - static RLMQuotaManagerConfig fetchQuotaManagerConfig(KafkaConfig config) { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); - return new RLMQuotaManagerConfig(config.remoteLogManagerFetchMaxBytesPerSecond(), + static RLMQuotaManagerConfig fetchQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { + return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerFetchMaxBytesPerSecond(), rlmConfig.remoteLogManagerFetchNumQuotaSamples(), rlmConfig.remoteLogManagerFetchQuotaWindowSizeSeconds()); } @@ -315,7 +311,6 @@ private T createDelegate(ClassLoader classLoader, String className) { @SuppressWarnings("removal") RemoteStorageManager createRemoteStorageManager() { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); return java.security.AccessController.doPrivileged(new PrivilegedAction() { private final String classPath = rlmConfig.remoteStorageManagerClassPath(); @@ -332,14 +327,13 @@ public RemoteStorageManager run() { } private void configureRSM() { - final Map rsmProps = new HashMap<>(config.remoteLogManagerConfig().remoteStorageManagerProps()); + final Map rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps()); rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId); remoteLogStorageManager.configure(rsmProps); } @SuppressWarnings("removal") RemoteLogMetadataManager createRemoteLogMetadataManager() { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); return java.security.AccessController.doPrivileged(new PrivilegedAction() { private final String classPath = rlmConfig.remoteLogMetadataManagerClassPath(); @@ -366,7 +360,7 @@ private void configureRLMM() { rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name); }); // update the remoteLogMetadataProps here to override endpoint config if any - rlmmProps.putAll(config.remoteLogManagerConfig().remoteLogMetadataManagerProps()); + rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId); rlmmProps.put(LOG_DIR_CONFIG, logDir); @@ -418,7 +412,7 @@ public void onLeadershipChange(Set partitionsBecomeLeader, Map topicIds) { LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); - if (config.remoteLogManagerConfig().isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { + if (rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); } @@ -1748,7 +1742,6 @@ public Future asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer convertToLeaderOrFollower) { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition, topicIdPartition -> { RLMTask task = new RLMTask(topicIdPartition, rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index a03128ab9f52b..7e225440abf61 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -621,7 +621,7 @@ class BrokerServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { - Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time, + Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f2cddda700fca..5ab580a4ae2f0 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -195,6 +195,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = { this.currentConfig = newConfig + this._remoteLogManagerConfig.updateCurrentConfig(currentConfig.props) } // The following captures any system properties impacting ZooKeeper TLS configuration @@ -869,14 +870,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) - def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) - - def remoteLogIndexFileCacheTotalSizeBytes: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) - - def remoteLogManagerCopyMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP) - - def remoteLogManagerFetchMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP) - validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 3154ab1c344eb..dffd2e7f697fb 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -691,7 +691,7 @@ class KafkaServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { - Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time, + Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6ab67a84ec426..3615788aeab52 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,7 +1479,7 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong + val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index bfb490d77da85..e4e588b7b6619 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -193,7 +193,7 @@ public class RemoteLogManagerTest { private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); - private KafkaConfig config; + private RemoteLogManagerConfig remoteLogManagerConfig; private BrokerTopicStats brokerTopicStats = null; private final Metrics metrics = new Metrics(time); @@ -223,11 +223,10 @@ void setUp() throws Exception { Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); - createRLMConfig(props); - config = KafkaConfig.fromProps(props); + remoteLogManagerConfig = createRLMConfig(props); brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); - remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { @@ -379,13 +378,10 @@ void testRemoteLogMetadataManagerWithEndpointConfig() { @Test void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException { Properties props = new Properties(); - props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); // override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix" props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL"); - createRLMConfig(props); - KafkaConfig config = KafkaConfig.fromProps(props); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config, + createRLMConfig(props), brokerId, logDir, clusterId, @@ -1286,7 +1282,7 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData, void testGetClassLoaderAwareRemoteStorageManager() throws Exception { ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); try (RemoteLogManager remoteLogManager = - new RemoteLogManager(config, brokerId, logDir, clusterId, time, + new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, t -> Optional.empty(), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1548,7 +1544,7 @@ void testIdempotentClose() throws IOException { public void testRemoveMetricsOnClose() throws IOException { MockedConstruction mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class); try { - RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, + RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; @@ -1943,7 +1939,7 @@ public void testFindLogStartOffset() throws RemoteStorageException, IOException else return Collections.emptyIterator(); }); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1968,7 +1964,7 @@ public void testFindLogStartOffsetFallbackToLocalLogStartOffsetWhenRemoteIsEmpty when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) .thenReturn(Collections.emptyIterator()); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -2002,7 +1998,7 @@ public void testLogStartOffsetUpdatedOnStartup() throws RemoteStorageException, }); AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2434,7 +2430,7 @@ private void verifyDeleteLogSegment(List segmentMetada @Test public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException { AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2576,7 +2572,7 @@ public void testReadForMissingFirstBatchInRemote() throws RemoteStorageException ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config, + remoteLogManagerConfig, brokerId, logDir, clusterId, @@ -2649,7 +2645,7 @@ public void testReadForFirstBatchMoreThanMaxFetchBytes(boolean minOneMessage) th ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config, + remoteLogManagerConfig, brokerId, logDir, clusterId, @@ -2734,7 +2730,7 @@ public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config, + remoteLogManagerConfig, brokerId, logDir, clusterId, @@ -2779,23 +2775,18 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l @Test public void testCopyQuotaManagerConfig() { Properties defaultProps = new Properties(); - defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); - createRLMConfig(defaultProps); - KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); + RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); - customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - createRLMConfig(customProps); - KafkaConfig config = KafkaConfig.fromProps(customProps); - - RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config); + RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); + RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); @@ -2804,22 +2795,17 @@ public void testCopyQuotaManagerConfig() { @Test public void testFetchQuotaManagerConfig() { Properties defaultProps = new Properties(); - defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); - createRLMConfig(defaultProps); - KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); - + RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); - customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - createRLMConfig(customProps); - KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps); + RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 8dad66919bc06..7095f253e1800 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -798,7 +798,7 @@ class DynamicBrokerConfigTest { val config = KafkaConfig(props) val kafkaBroker = mock(classOf[KafkaBroker]) when(kafkaBroker.config).thenReturn(config) - assertEquals(500, config.remoteFetchMaxWaitMs) + assertEquals(500, config.remoteLogManagerConfig.remoteFetchMaxWaitMs) val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) config.dynamicConfig.initialize(None, None) @@ -809,13 +809,13 @@ class DynamicBrokerConfigTest { // update default config config.dynamicConfig.validate(newProps, perBrokerConfig = false) config.dynamicConfig.updateDefaultConfig(newProps) - assertEquals(30000, config.remoteFetchMaxWaitMs) + assertEquals(30000, config.remoteLogManagerConfig.remoteFetchMaxWaitMs) // update per broker config newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000") config.dynamicConfig.validate(newProps, perBrokerConfig = true) config.dynamicConfig.updateBrokerConfig(0, newProps) - assertEquals(10000, config.remoteFetchMaxWaitMs) + assertEquals(10000, config.remoteLogManagerConfig.remoteFetchMaxWaitMs) // invalid values for (maxWaitMs <- Seq(-1, 0)) { @@ -832,10 +832,10 @@ class DynamicBrokerConfigTest { val config = KafkaConfig(origProps) val serverMock = Mockito.mock(classOf[KafkaBroker]) - val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager])) + val remoteLogManager = Mockito.mock(classOf[RemoteLogManager]) Mockito.when(serverMock.config).thenReturn(config) - Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) @@ -844,10 +844,10 @@ class DynamicBrokerConfigTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "4") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(4L, config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) - Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4) + assertEquals(4L, config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()) + Mockito.verify(remoteLogManager).resizeCacheSize(4) - Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) + Mockito.verifyNoMoreInteractions(remoteLogManager) } @Test @@ -864,18 +864,18 @@ class DynamicBrokerConfigTest { config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, - config.remoteLogManagerCopyMaxBytesPerSecond) + config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) // Update default config props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "100") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(100, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) verify(remoteLogManager).updateCopyQuota(100) // Update per broker config props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "200") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) verify(remoteLogManager).updateCopyQuota(200) verifyNoMoreInteractions(remoteLogManager) @@ -895,18 +895,18 @@ class DynamicBrokerConfigTest { config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, - config.remoteLogManagerFetchMaxBytesPerSecond) + config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) // Update default config props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "100") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(100, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(100, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) verify(remoteLogManager).updateFetchQuota(100) // Update per broker config props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "200") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) verify(remoteLogManager).updateFetchQuota(200) verifyNoMoreInteractions(remoteLogManager) @@ -930,18 +930,21 @@ class DynamicBrokerConfigTest { config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Default values - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.remoteLogIndexFileCacheTotalSizeBytes) - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.remoteLogManagerCopyMaxBytesPerSecond) - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, + config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, + config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, + config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) // Update default config props.put(indexFileCacheSizeProp, "4") props.put(copyQuotaProp, "100") props.put(fetchQuotaProp, "200") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(4, config.remoteLogIndexFileCacheTotalSizeBytes) - assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond) - assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(4, config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()) + assertEquals(100, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) + assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) verify(remoteLogManager).resizeCacheSize(4) verify(remoteLogManager).updateCopyQuota(100) verify(remoteLogManager).updateFetchQuota(200) @@ -951,9 +954,9 @@ class DynamicBrokerConfigTest { props.put(copyQuotaProp, "200") props.put(fetchQuotaProp, "400") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(8, config.remoteLogIndexFileCacheTotalSizeBytes) - assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond) - assertEquals(400, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(8, config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()) + assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) + assertEquals(400, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) verify(remoteLogManager).resizeCacheSize(8) verify(remoteLogManager).updateCopyQuota(200) verify(remoteLogManager).updateFetchQuota(400) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 53b342e10db8a..e44a6efa5e19e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4093,11 +4093,11 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) // set log reader threads number to 2 props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString) - val config = KafkaConfig.fromProps(props) + val remoteLogManagerConfig = new RemoteLogManagerConfig(props) val mockLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - config, + remoteLogManagerConfig, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", @@ -4199,11 +4199,11 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) - val config = KafkaConfig.fromProps(props) + val remoteLogManagerConfig = new RemoteLogManagerConfig(props) val dummyLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - config, + remoteLogManagerConfig, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 480983edec42b..3008ded8109ae 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -186,6 +186,12 @@ public final class RemoteLogManagerConfig extends AbstractConfig { public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request"; public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; + // dynamic configs + private long remoteLogIndexFileCacheTotalSizeBytes; + private long remoteLogManagerCopyMaxBytesPerSecond; + private long remoteLogManagerFetchMaxBytesPerSecond; + private int remoteFetchMaxWaitMs; + public static ConfigDef configDef() { return new ConfigDef() .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, @@ -356,6 +362,29 @@ public static ConfigDef configDef() { public RemoteLogManagerConfig(Map props) { super(configDef(), props); + this.remoteLogIndexFileCacheTotalSizeBytes = getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); + this.remoteLogManagerCopyMaxBytesPerSecond = getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); + this.remoteLogManagerFetchMaxBytesPerSecond = getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); + this.remoteFetchMaxWaitMs = getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP); + } + + /** + * Update the current configuration with the given properties. + * @param props + */ + public void updateCurrentConfig(Map props) { + if (props.containsKey(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) { + this.remoteLogIndexFileCacheTotalSizeBytes = Long.parseLong(String.valueOf(props.get(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))); + } + if (props.containsKey(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) { + this.remoteLogManagerCopyMaxBytesPerSecond = Long.parseLong(String.valueOf(props.get(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP))); + } + if (props.containsKey(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) { + this.remoteLogManagerFetchMaxBytesPerSecond = Long.parseLong(String.valueOf(props.get(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP))); + } + if (props.containsKey(REMOTE_FETCH_MAX_WAIT_MS_PROP)) { + this.remoteFetchMaxWaitMs = Integer.parseInt(String.valueOf(props.get(REMOTE_FETCH_MAX_WAIT_MS_PROP))); + } } public boolean isRemoteStorageSystemEnabled() { @@ -459,6 +488,23 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() { return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); } + // Dynamic Configs + public long remoteLogIndexFileCacheTotalSizeBytes() { + return remoteLogIndexFileCacheTotalSizeBytes; + } + + public long remoteLogManagerCopyMaxBytesPerSecond() { + return remoteLogManagerCopyMaxBytesPerSecond; + } + + public long remoteLogManagerFetchMaxBytesPerSecond() { + return remoteLogManagerFetchMaxBytesPerSecond; + } + + public int remoteFetchMaxWaitMs() { + return remoteFetchMaxWaitMs; + } + public static void main(String[] args) { System.out.println(configDef().toHtml(4, config -> "remote_log_manager_" + config)); } From 08912b932f686bf7e89116485a2bdf270fb942f5 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Wed, 19 Jun 2024 12:23:58 +0530 Subject: [PATCH 2/5] Add unit test --- .../storage/RemoteLogManagerConfigTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 6904c8f3d4b89..35e28f495fe9b 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -69,6 +69,21 @@ public void testValidateEmptyStringConfig() { new RemoteLogManagerConfig(emptyStringProps)); } + @Test + public void testDynamicConfigs() { + Map emptyProps = new HashMap<>(); + RemoteLogManagerConfig rlmConfig = new RemoteLogManagerConfig(emptyProps); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, + rlmConfig.remoteLogManagerCopyMaxBytesPerSecond()); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, + rlmConfig.remoteLogManagerFetchMaxBytesPerSecond()); + + rlmConfig.updateCurrentConfig(Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100L)); + assertEquals(100L, rlmConfig.remoteLogManagerCopyMaxBytesPerSecond()); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, + rlmConfig.remoteLogManagerFetchMaxBytesPerSecond()); + } + private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { Map props = new HashMap<>(); From 2d46ab31f467bab544e1c2ef4dbf19de1adea2ae Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Wed, 19 Jun 2024 22:36:28 +0530 Subject: [PATCH 3/5] make dynamic config to be thread-safe --- .../server/log/remote/storage/RemoteLogManagerConfig.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 3008ded8109ae..8b9bf27b2e0e6 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -187,10 +187,10 @@ public final class RemoteLogManagerConfig extends AbstractConfig { public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; // dynamic configs - private long remoteLogIndexFileCacheTotalSizeBytes; - private long remoteLogManagerCopyMaxBytesPerSecond; - private long remoteLogManagerFetchMaxBytesPerSecond; - private int remoteFetchMaxWaitMs; + private volatile long remoteLogIndexFileCacheTotalSizeBytes; + private volatile long remoteLogManagerCopyMaxBytesPerSecond; + private volatile long remoteLogManagerFetchMaxBytesPerSecond; + private volatile int remoteFetchMaxWaitMs; public static ConfigDef configDef() { return new ConfigDef() From 39b8d92a055a9276dae8f0b32da2014ba73e6729 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Thu, 20 Jun 2024 10:35:01 +0530 Subject: [PATCH 4/5] Addressed the review comments. --- .../main/scala/kafka/server/KafkaConfig.scala | 3 +- .../log/remote/RemoteLogManagerTest.java | 70 ++++++++------ .../server/DynamicBrokerConfigTest.scala | 1 + .../kafka/server/ReplicaManagerTest.scala | 12 +-- .../storage/RemoteLogManagerConfig.java | 93 +++++++------------ .../storage/RemoteLogManagerConfigTest.java | 48 +++++----- 6 files changed, 106 insertions(+), 121 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5ab580a4ae2f0..f23c539d1231b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -195,7 +195,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = { this.currentConfig = newConfig - this._remoteLogManagerConfig.updateCurrentConfig(currentConfig.props) } // The following captures any system properties impacting ZooKeeper TLS configuration @@ -232,7 +231,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG) val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG) - private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props) + private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) def remoteLogManagerConfig = _remoteLogManagerConfig private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = { diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index e4e588b7b6619..00f6b6e8edf93 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -193,7 +193,7 @@ public class RemoteLogManagerTest { private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); - private RemoteLogManagerConfig remoteLogManagerConfig; + private KafkaConfig config; private BrokerTopicStats brokerTopicStats = null; private final Metrics metrics = new Metrics(time); @@ -223,10 +223,11 @@ void setUp() throws Exception { Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); - remoteLogManagerConfig = createRLMConfig(props); + appendRLMConfig(props); + config = KafkaConfig.fromProps(props); brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); - remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { @@ -334,11 +335,14 @@ void testRemoteLogMetadataManagerWithUserDefinedConfigs() { String key = "key"; String configPrefix = "config.prefix"; Properties props = new Properties(); + props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, configPrefix); props.put(configPrefix + key, "world"); props.put("remote.log.metadata.y", "z"); + appendRLMConfig(props); + KafkaConfig config = KafkaConfig.fromProps(props); - Map metadataMangerConfig = createRLMConfig(props).remoteLogMetadataManagerProps(); + Map metadataMangerConfig = config.remoteLogManagerConfig().remoteLogMetadataManagerProps(); assertEquals(props.get(configPrefix + key), metadataMangerConfig.get(key)); assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y")); } @@ -348,11 +352,14 @@ void testRemoteStorageManagerWithUserDefinedConfigs() { String key = "key"; String configPrefix = "config.prefix"; Properties props = new Properties(); + props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, configPrefix); props.put(configPrefix + key, "world"); props.put("remote.storage.manager.y", "z"); + appendRLMConfig(props); + KafkaConfig config = KafkaConfig.fromProps(props); - Map remoteStorageManagerConfig = createRLMConfig(props).remoteStorageManagerProps(); + Map remoteStorageManagerConfig = config.remoteLogManagerConfig().remoteStorageManagerProps(); assertEquals(props.get(configPrefix + key), remoteStorageManagerConfig.get(key)); assertFalse(remoteStorageManagerConfig.containsKey("remote.storage.manager.y")); } @@ -378,10 +385,13 @@ void testRemoteLogMetadataManagerWithEndpointConfig() { @Test void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException { Properties props = new Properties(); + props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); // override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix" props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL"); + appendRLMConfig(props); + KafkaConfig config = KafkaConfig.fromProps(props); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - createRLMConfig(props), + config.remoteLogManagerConfig(), brokerId, logDir, clusterId, @@ -1282,7 +1292,7 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData, void testGetClassLoaderAwareRemoteStorageManager() throws Exception { ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); try (RemoteLogManager remoteLogManager = - new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, t -> Optional.empty(), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1544,7 +1554,7 @@ void testIdempotentClose() throws IOException { public void testRemoveMetricsOnClose() throws IOException { MockedConstruction mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class); try { - RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, + RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; @@ -1939,7 +1949,7 @@ public void testFindLogStartOffset() throws RemoteStorageException, IOException else return Collections.emptyIterator(); }); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1964,7 +1974,7 @@ public void testFindLogStartOffsetFallbackToLocalLogStartOffsetWhenRemoteIsEmpty when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) .thenReturn(Collections.emptyIterator()); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1998,7 +2008,7 @@ public void testLogStartOffsetUpdatedOnStartup() throws RemoteStorageException, }); AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2430,7 +2440,7 @@ private void verifyDeleteLogSegment(List segmentMetada @Test public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException { AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2572,7 +2582,7 @@ public void testReadForMissingFirstBatchInRemote() throws RemoteStorageException ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config.remoteLogManagerConfig(), brokerId, logDir, clusterId, @@ -2645,7 +2655,7 @@ public void testReadForFirstBatchMoreThanMaxFetchBytes(boolean minOneMessage) th ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config.remoteLogManagerConfig(), brokerId, logDir, clusterId, @@ -2730,7 +2740,7 @@ public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException try (RemoteLogManager remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config.remoteLogManagerConfig(), brokerId, logDir, clusterId, @@ -2775,18 +2785,23 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l @Test public void testCopyQuotaManagerConfig() { Properties defaultProps = new Properties(); - RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); - RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); + defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); + appendRLMConfig(defaultProps); + KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); + RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); + customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); - RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); + appendRLMConfig(customProps); + KafkaConfig config = KafkaConfig.fromProps(customProps); + + RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config.remoteLogManagerConfig()); assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); @@ -2795,18 +2810,23 @@ public void testCopyQuotaManagerConfig() { @Test public void testFetchQuotaManagerConfig() { Properties defaultProps = new Properties(); - RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); - RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); + defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); + appendRLMConfig(defaultProps); + KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); + + RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); + customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); - RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); + appendRLMConfig(customProps); + KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps); + RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig.remoteLogManagerConfig()); assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); @@ -3058,7 +3078,7 @@ private Partition mockPartition(TopicIdPartition topicIdPartition) { return partition; } - private RemoteLogManagerConfig createRLMConfig(Properties props) { + private void appendRLMConfig(Properties props) { props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName()); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName()); @@ -3069,8 +3089,6 @@ private RemoteLogManagerConfig createRLMConfig(Properties props) { props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataCommonClientTestProp, remoteLogMetadataCommonClientTestVal); props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal); props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal); - - return new RemoteLogManagerConfig(props); } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 7095f253e1800..a9d6eea9373f3 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -798,6 +798,7 @@ class DynamicBrokerConfigTest { val config = KafkaConfig(props) val kafkaBroker = mock(classOf[KafkaBroker]) when(kafkaBroker.config).thenReturn(config) + when(kafkaBroker.remoteLogManagerOpt).thenReturn(None) assertEquals(500, config.remoteLogManagerConfig.remoteFetchMaxWaitMs) val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index e44a6efa5e19e..a1f3ce002c2f2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4093,11 +4093,11 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) // set log reader threads number to 2 props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString) - val remoteLogManagerConfig = new RemoteLogManagerConfig(props) + val config = KafkaConfig.fromProps(props) val mockLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config.remoteLogManagerConfig, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", @@ -4199,11 +4199,11 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) - val remoteLogManagerConfig = new RemoteLogManagerConfig(props) + val config = KafkaConfig.fromProps(props) val dummyLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - remoteLogManagerConfig, + config.remoteLogManagerConfig, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 8b9bf27b2e0e6..d6eb876f11e19 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -32,7 +32,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.LONG; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; -public final class RemoteLogManagerConfig extends AbstractConfig { +public final class RemoteLogManagerConfig { /** * Prefix used for properties to be passed to {@link RemoteStorageManager} implementation. Remote log subsystem collects all the properties having @@ -186,11 +186,7 @@ public final class RemoteLogManagerConfig extends AbstractConfig { public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request"; public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; - // dynamic configs - private volatile long remoteLogIndexFileCacheTotalSizeBytes; - private volatile long remoteLogManagerCopyMaxBytesPerSecond; - private volatile long remoteLogManagerFetchMaxBytesPerSecond; - private volatile int remoteFetchMaxWaitMs; + private final AbstractConfig config; public static ConfigDef configDef() { return new ConfigDef() @@ -359,104 +355,81 @@ public static ConfigDef configDef() { MEDIUM, REMOTE_FETCH_MAX_WAIT_MS_DOC); } - - public RemoteLogManagerConfig(Map props) { - super(configDef(), props); - this.remoteLogIndexFileCacheTotalSizeBytes = getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); - this.remoteLogManagerCopyMaxBytesPerSecond = getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); - this.remoteLogManagerFetchMaxBytesPerSecond = getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); - this.remoteFetchMaxWaitMs = getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP); - } - - /** - * Update the current configuration with the given properties. - * @param props - */ - public void updateCurrentConfig(Map props) { - if (props.containsKey(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) { - this.remoteLogIndexFileCacheTotalSizeBytes = Long.parseLong(String.valueOf(props.get(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))); - } - if (props.containsKey(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) { - this.remoteLogManagerCopyMaxBytesPerSecond = Long.parseLong(String.valueOf(props.get(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP))); - } - if (props.containsKey(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) { - this.remoteLogManagerFetchMaxBytesPerSecond = Long.parseLong(String.valueOf(props.get(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP))); - } - if (props.containsKey(REMOTE_FETCH_MAX_WAIT_MS_PROP)) { - this.remoteFetchMaxWaitMs = Integer.parseInt(String.valueOf(props.get(REMOTE_FETCH_MAX_WAIT_MS_PROP))); - } + + public RemoteLogManagerConfig(AbstractConfig config) { + this.config = config; } public boolean isRemoteStorageSystemEnabled() { - return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); + return config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); } public String remoteStorageManagerClassName() { - return getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP); + return config.getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP); } public String remoteStorageManagerClassPath() { - return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP); + return config.getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP); } public String remoteLogMetadataManagerClassName() { - return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); + return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); } public String remoteLogMetadataManagerClassPath() { - return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); + return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); } public int remoteLogManagerThreadPoolSize() { - return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); + return config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerCopierThreadPoolSize() { - return getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); + return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerExpirationThreadPoolSize() { - return getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); + return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); } public long remoteLogManagerTaskIntervalMs() { - return getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP); + return config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP); } public long remoteLogManagerTaskRetryBackoffMs() { - return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP); + return config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP); } public long remoteLogManagerTaskRetryBackoffMaxMs() { - return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP); + return config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP); } public double remoteLogManagerTaskRetryJitter() { - return getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP); + return config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP); } public int remoteLogReaderThreads() { - return getInt(REMOTE_LOG_READER_THREADS_PROP); + return config.getInt(REMOTE_LOG_READER_THREADS_PROP); } public int remoteLogReaderMaxPendingTasks() { - return getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP); + return config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP); } public String remoteLogMetadataManagerListenerName() { - return getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP); + return config.getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP); } public int remoteLogMetadataCustomMetadataMaxBytes() { - return getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP); + return config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP); } public String remoteStorageManagerPrefix() { - return getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); + return config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); } public String remoteLogMetadataManagerPrefix() { - return getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); + return config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); } public Map remoteStorageManagerProps() { @@ -468,41 +441,41 @@ public Map remoteLogMetadataManagerProps() { } public Map getConfigProps(String configPrefixProp) { - String prefixProp = getString(configPrefixProp); - return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(originalsWithPrefix(prefixProp)); + String prefixProp = config.getString(configPrefixProp); + return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(config.originalsWithPrefix(prefixProp)); } public int remoteLogManagerCopyNumQuotaSamples() { - return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); + return config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); } public int remoteLogManagerCopyQuotaWindowSizeSeconds() { - return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); + return config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); } public int remoteLogManagerFetchNumQuotaSamples() { - return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); + return config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); } public int remoteLogManagerFetchQuotaWindowSizeSeconds() { - return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); + return config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); } // Dynamic Configs public long remoteLogIndexFileCacheTotalSizeBytes() { - return remoteLogIndexFileCacheTotalSizeBytes; + return config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); } public long remoteLogManagerCopyMaxBytesPerSecond() { - return remoteLogManagerCopyMaxBytesPerSecond; + return config.getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); } public long remoteLogManagerFetchMaxBytesPerSecond() { - return remoteLogManagerFetchMaxBytesPerSecond; + return config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); } public int remoteFetchMaxWaitMs() { - return remoteFetchMaxWaitMs; + return config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP); } public static void main(String[] args) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 35e28f495fe9b..cb28f71a45609 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.server.log.remote.storage; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; @@ -39,24 +40,18 @@ public void testValidConfigs() { Map props = getRLMProps(rsmPrefix, rlmmPrefix); rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v)); rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v)); + RLMTestConfig config = new RLMTestConfig(props); - RemoteLogManagerConfig expectedRemoteLogManagerConfig = new RemoteLogManagerConfig(props); - - // Removing remote.log.metadata.manager.class.name so that the default value gets picked up. - props.remove(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); - - RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(props); - assertEquals(expectedRemoteLogManagerConfig.values(), remoteLogManagerConfig.values()); - - assertEquals(rsmProps, remoteLogManagerConfig.remoteStorageManagerProps()); - assertEquals(rlmmProps, remoteLogManagerConfig.remoteLogMetadataManagerProps()); + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); + assertEquals(rsmProps, rlmConfig.remoteStorageManagerProps()); + assertEquals(rlmmProps, rlmConfig.remoteLogMetadataManagerProps()); } @Test public void testDefaultConfigs() { // Even with empty properties, RemoteLogManagerConfig has default values Map emptyProps = new HashMap<>(); - RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RemoteLogManagerConfig(emptyProps); + RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig(); assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize()); assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples()); } @@ -66,22 +61,7 @@ public void testValidateEmptyStringConfig() { // Test with a empty string props should throw ConfigException Map emptyStringProps = Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, ""); assertThrows(ConfigException.class, () -> - new RemoteLogManagerConfig(emptyStringProps)); - } - - @Test - public void testDynamicConfigs() { - Map emptyProps = new HashMap<>(); - RemoteLogManagerConfig rlmConfig = new RemoteLogManagerConfig(emptyProps); - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, - rlmConfig.remoteLogManagerCopyMaxBytesPerSecond()); - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, - rlmConfig.remoteLogManagerFetchMaxBytesPerSecond()); - - rlmConfig.updateCurrentConfig(Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100L)); - assertEquals(100L, rlmConfig.remoteLogManagerCopyMaxBytesPerSecond()); - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, - rlmConfig.remoteLogManagerFetchMaxBytesPerSecond()); + new RLMTestConfig(emptyStringProps).remoteLogManagerConfig()); } private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { @@ -126,4 +106,18 @@ private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { rlmmPrefix); return props; } + + private static class RLMTestConfig extends AbstractConfig { + + private final RemoteLogManagerConfig rlmConfig; + + public RLMTestConfig(Map originals) { + super(RemoteLogManagerConfig.configDef(), originals, true); + rlmConfig = new RemoteLogManagerConfig(this); + } + + public RemoteLogManagerConfig remoteLogManagerConfig() { + return rlmConfig; + } + } } \ No newline at end of file From daf36fd7bf363574270be10d8423a6bb8e702fc7 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Thu, 20 Jun 2024 10:51:38 +0530 Subject: [PATCH 5/5] remove comment --- .../kafka/server/log/remote/storage/RemoteLogManagerConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index d6eb876f11e19..fabb4f7c787e9 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -461,7 +461,6 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() { return config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); } - // Dynamic Configs public long remoteLogIndexFileCacheTotalSizeBytes() { return config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); }