Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig {
* <code>fetch.max.wait.ms</code>
*/
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before " +
"answering the fetch request there isn't sufficient data to immediately satisfy the requirement given by " +
"fetch.min.bytes. This config is used only for local log fetch. To tune the remote fetch maximum wait " +
"time, please refer to 'remote.fetch.max.wait.ms' broker config";
public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500;

/** <code>metadata.max.age.ms</code> */
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import scala.collection._
class DelayedRemoteFetch(remoteFetchTask: Future[Void],
remoteFetchResult: CompletableFuture[RemoteLogReadResult],
remoteFetchInfo: RemoteStorageFetchInfo,
remoteFetchMaxWaitMs: Long,
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
fetchParams: FetchParams,
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
replicaManager: ReplicaManager,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(fetchParams.maxWaitMs) {
extends DelayedOperation(remoteFetchMaxWaitMs) {

if (fetchParams.isFromFollower) {
throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams")
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,7 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w

object DynamicRemoteLogConfig {
val ReconfigurableConfigs = Set(
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP
)
}
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP)

def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP)

validateValues()

@nowarn("cat=deprecation")
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1479,9 +1479,9 @@ class ReplicaManager(val config: KafkaConfig,
return Some(createLogReadResult(e))
}

val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo,
val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
fetchPartitionStatus, params, logReadResults, this, responseCallback)

delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class DelayedRemoteFetchTest {
private val fetchOffset = 500L
private val logStartOffset = 0L
private val currentLeaderEpoch = Optional.of[Integer](10)
private val remoteFetchMaxWaitMs = 500

private val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
Expand All @@ -64,8 +65,8 @@ class DelayedRemoteFetchTest {
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)

val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)

when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
Expand Down Expand Up @@ -100,8 +101,8 @@ class DelayedRemoteFetchTest {
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
}

@Test
Expand All @@ -124,8 +125,8 @@ class DelayedRemoteFetchTest {

val logReadInfo = buildReadResult(Errors.NONE)

val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)

// delayed remote fetch should still be able to complete
assertTrue(delayedRemoteFetch.tryComplete())
Expand Down Expand Up @@ -155,8 +156,8 @@ class DelayedRemoteFetchTest {
// build a read result with error
val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)

val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)

assertTrue(delayedRemoteFetch.tryComplete())
assertTrue(delayedRemoteFetch.isCompleted)
Expand Down Expand Up @@ -184,8 +185,8 @@ class DelayedRemoteFetchTest {
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)

val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)

when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,39 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}

@Test
def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val config = KafkaConfig(props)
val kafkaBroker = mock(classOf[KafkaBroker])
when(kafkaBroker.config).thenReturn(config)
assertEquals(500, config.remoteFetchMaxWaitMs)

val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)

val newProps = new Properties()
newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000")
// update default config
config.dynamicConfig.validate(newProps, perBrokerConfig = false)
config.dynamicConfig.updateDefaultConfig(newProps)
assertEquals(30000, config.remoteFetchMaxWaitMs)

// update per broker config
newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000")
config.dynamicConfig.validate(newProps, perBrokerConfig = true)
config.dynamicConfig.updateBrokerConfig(0, newProps)
assertEquals(10000, config.remoteFetchMaxWaitMs)

// invalid values
for (maxWaitMs <- Seq(-1, 0)) {
newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, maxWaitMs.toString)
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true))
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false))
}
}

@Test
def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public final class RemoteLogManagerConfig {
"implementation. For example this value can be `rlmm.config.`.";
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config.";


public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = "remote.log.storage.system.enable";
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether to enable tiered storage functionality in a broker or not. Valid values " +
"are `true` or `false` and the default value is false. When it is true broker starts all the services required for the tiered storage functionality.";
Expand Down Expand Up @@ -175,6 +174,10 @@ public final class RemoteLogManagerConfig {
"The default value is 1 second.";
public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1;

public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms";
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request";
public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;

public static final ConfigDef CONFIG_DEF = new ConfigDef();

static {
Expand Down Expand Up @@ -323,7 +326,13 @@ public final class RemoteLogManagerConfig {
DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS,
atLeast(1),
MEDIUM,
REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC);
REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC)
.define(REMOTE_FETCH_MAX_WAIT_MS_PROP,
INT,
DEFAULT_REMOTE_FETCH_MAX_WAIT_MS,
atLeast(1),
MEDIUM,
REMOTE_FETCH_MAX_WAIT_MS_DOC);
}

private final boolean enableRemoteStorageSystem;
Expand Down Expand Up @@ -351,6 +360,7 @@ public final class RemoteLogManagerConfig {
private final long remoteLogManagerFetchMaxBytesPerSecond;
private final int remoteLogManagerFetchNumQuotaSamples;
private final int remoteLogManagerFetchQuotaWindowSizeSeconds;
private final int remoteFetchMaxWaitMs;

public RemoteLogManagerConfig(AbstractConfig config) {
this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
Expand Down Expand Up @@ -381,7 +391,8 @@ public RemoteLogManagerConfig(AbstractConfig config) {
config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP),
config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP),
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP),
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP));
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP),
config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP));
}

// Visible for testing
Expand Down Expand Up @@ -409,8 +420,8 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem,
int remoteLogManagerCopyQuotaWindowSizeSeconds,
long remoteLogManagerFetchMaxBytesPerSecond,
int remoteLogManagerFetchNumQuotaSamples,
int remoteLogManagerFetchQuotaWindowSizeSeconds
) {
int remoteLogManagerFetchQuotaWindowSizeSeconds,
int remoteFetchMaxWaitMs) {
this.enableRemoteStorageSystem = enableRemoteStorageSystem;
this.remoteStorageManagerClassName = remoteStorageManagerClassName;
this.remoteStorageManagerClassPath = remoteStorageManagerClassPath;
Expand All @@ -436,6 +447,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem,
this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond;
this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples;
this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds;
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
}

public boolean enableRemoteStorageSystem() {
Expand Down Expand Up @@ -538,7 +550,6 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() {
return remoteLogManagerFetchQuotaWindowSizeSeconds;
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -568,7 +579,8 @@ public boolean equals(Object o) {
&& remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds
&& remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond
&& remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples
&& remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds;
&& remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds
&& remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs;
}

@Override
Expand All @@ -580,7 +592,7 @@ public int hashCode() {
remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond,
remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond,
remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds);
remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs);
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) {
remoteLogMetadataManagerClass, "dummy.remote.log.metadata.class.path",
"listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100, 100,
rsmPrefix, rsmProps, rlmmPrefix, rlmmProps, Long.MAX_VALUE, 11, 1,
Long.MAX_VALUE, 11, 1);
Long.MAX_VALUE, 11, 1, 500);

Map<String, Object> props = extractProps(expectedRemoteLogManagerConfig);
rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
Expand Down