diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 7ec147e9e3ced..76bfe7e91a149 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig { * fetch.max.wait.ms */ public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; - private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes."; + private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before " + + "answering the fetch request there isn't sufficient data to immediately satisfy the requirement given by " + + "fetch.min.bytes. This config is used only for local log fetch. To tune the remote fetch maximum wait " + + "time, please refer to 'remote.fetch.max.wait.ms' broker config"; public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500; /** metadata.max.age.ms */ diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala index 00d6afb89ffe2..58a866aa4a63f 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala @@ -35,12 +35,13 @@ import scala.collection._ class DelayedRemoteFetch(remoteFetchTask: Future[Void], remoteFetchResult: CompletableFuture[RemoteLogReadResult], remoteFetchInfo: RemoteStorageFetchInfo, + remoteFetchMaxWaitMs: Long, fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], fetchParams: FetchParams, localReadResults: Seq[(TopicIdPartition, LogReadResult)], replicaManager: ReplicaManager, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit) - extends DelayedOperation(fetchParams.maxWaitMs) { + extends DelayedOperation(remoteFetchMaxWaitMs) { if (fetchParams.isFromFollower) { throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams") diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 6fa43b560ddea..9b75425666f70 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1202,6 +1202,7 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w object DynamicRemoteLogConfig { val ReconfigurableConfigs = Set( - RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP + RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, + RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP ) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b812f299ecbf7..dcaef5f229fde 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1213,6 +1213,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) + def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) + validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6908ec096d179..6ab67a84ec426 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,9 +1479,9 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, + val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong + val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) - delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) None } diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala index c35385bcbc8ca..ea1ffaf0b1179 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala @@ -40,6 +40,7 @@ class DelayedRemoteFetchTest { private val fetchOffset = 500L private val logStartOffset = 0L private val currentLeaderEpoch = Optional.of[Integer](10) + private val remoteFetchMaxWaitMs = 500 private val fetchStatus = FetchPartitionStatus( startOffsetMetadata = new LogOffsetMetadata(fetchOffset), @@ -64,8 +65,8 @@ class DelayedRemoteFetchTest { val leaderLogStartOffset = 10 val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenReturn(mock(classOf[Partition])) @@ -100,8 +101,8 @@ class DelayedRemoteFetchTest { val leaderLogStartOffset = 10 val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500) - assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback)) + assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)) } @Test @@ -124,8 +125,8 @@ class DelayedRemoteFetchTest { val logReadInfo = buildReadResult(Errors.NONE) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) // delayed remote fetch should still be able to complete assertTrue(delayedRemoteFetch.tryComplete()) @@ -155,8 +156,8 @@ class DelayedRemoteFetchTest { // build a read result with error val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) assertTrue(delayedRemoteFetch.tryComplete()) assertTrue(delayedRemoteFetch.isCompleted) @@ -184,8 +185,8 @@ class DelayedRemoteFetchTest { val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false) val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenReturn(mock(classOf[Partition])) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 328d6e41b5722..2e5d77cdc6fae 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -792,6 +792,39 @@ class DynamicBrokerConfigTest { verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100) } + @Test + def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val config = KafkaConfig(props) + val kafkaBroker = mock(classOf[KafkaBroker]) + when(kafkaBroker.config).thenReturn(config) + assertEquals(500, config.remoteFetchMaxWaitMs) + + val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) + config.dynamicConfig.initialize(None, None) + config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) + + val newProps = new Properties() + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000") + // update default config + config.dynamicConfig.validate(newProps, perBrokerConfig = false) + config.dynamicConfig.updateDefaultConfig(newProps) + assertEquals(30000, config.remoteFetchMaxWaitMs) + + // update per broker config + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000") + config.dynamicConfig.validate(newProps, perBrokerConfig = true) + config.dynamicConfig.updateBrokerConfig(0, newProps) + assertEquals(10000, config.remoteFetchMaxWaitMs) + + // invalid values + for (maxWaitMs <- Seq(-1, 0)) { + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, maxWaitMs.toString) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false)) + } + } + @Test def testUpdateDynamicRemoteLogManagerConfig(): Unit = { val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 6ea752886a992..4407de1e885fe 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -54,7 +54,6 @@ public final class RemoteLogManagerConfig { "implementation. For example this value can be `rlmm.config.`."; public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config."; - public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = "remote.log.storage.system.enable"; public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether to enable tiered storage functionality in a broker or not. Valid values " + "are `true` or `false` and the default value is false. When it is true broker starts all the services required for the tiered storage functionality."; @@ -175,6 +174,10 @@ public final class RemoteLogManagerConfig { "The default value is 1 second."; public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1; + public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms"; + public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request"; + public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; + public static final ConfigDef CONFIG_DEF = new ConfigDef(); static { @@ -323,7 +326,13 @@ public final class RemoteLogManagerConfig { DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, atLeast(1), MEDIUM, - REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC); + REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC) + .define(REMOTE_FETCH_MAX_WAIT_MS_PROP, + INT, + DEFAULT_REMOTE_FETCH_MAX_WAIT_MS, + atLeast(1), + MEDIUM, + REMOTE_FETCH_MAX_WAIT_MS_DOC); } private final boolean enableRemoteStorageSystem; @@ -351,6 +360,7 @@ public final class RemoteLogManagerConfig { private final long remoteLogManagerFetchMaxBytesPerSecond; private final int remoteLogManagerFetchNumQuotaSamples; private final int remoteLogManagerFetchQuotaWindowSizeSeconds; + private final int remoteFetchMaxWaitMs; public RemoteLogManagerConfig(AbstractConfig config) { this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP), @@ -381,7 +391,8 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP), config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP), config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP), - config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP)); + config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP), + config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP)); } // Visible for testing @@ -409,8 +420,8 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, int remoteLogManagerCopyQuotaWindowSizeSeconds, long remoteLogManagerFetchMaxBytesPerSecond, int remoteLogManagerFetchNumQuotaSamples, - int remoteLogManagerFetchQuotaWindowSizeSeconds - ) { + int remoteLogManagerFetchQuotaWindowSizeSeconds, + int remoteFetchMaxWaitMs) { this.enableRemoteStorageSystem = enableRemoteStorageSystem; this.remoteStorageManagerClassName = remoteStorageManagerClassName; this.remoteStorageManagerClassPath = remoteStorageManagerClassPath; @@ -436,6 +447,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond; this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples; this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds; + this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs; } public boolean enableRemoteStorageSystem() { @@ -538,7 +550,6 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() { return remoteLogManagerFetchQuotaWindowSizeSeconds; } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -568,7 +579,8 @@ public boolean equals(Object o) { && remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds && remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond && remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples - && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds; + && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds + && remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs; } @Override @@ -580,7 +592,7 @@ public int hashCode() { remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond, remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond, - remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds); + remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs); } public static void main(String[] args) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 45fd6669e7d4f..b119571d8e151 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -49,7 +49,7 @@ public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) { remoteLogMetadataManagerClass, "dummy.remote.log.metadata.class.path", "listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100, 100, rsmPrefix, rsmProps, rlmmPrefix, rlmmProps, Long.MAX_VALUE, 11, 1, - Long.MAX_VALUE, 11, 1); + Long.MAX_VALUE, 11, 1, 500); Map props = extractProps(expectedRemoteLogManagerConfig); rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));