Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.QuotaType;
import kafka.server.StopPartition;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -151,7 +152,7 @@ public class RemoteLogManager implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
private final RemoteLogManagerConfig rlmConfig;
private final KafkaConfig config;
private final int brokerId;
private final String logDir;
private final Time time;
Expand Down Expand Up @@ -192,7 +193,7 @@ public class RemoteLogManager implements Closeable {
/**
* Creates RemoteLogManager instance with the given arguments.
*
* @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
* @param config Configuration required for remote logging subsystem(tiered storage) at the broker level.
* @param brokerId id of the current broker.
* @param logDir directory of Kafka log segments.
* @param time Time instance.
Expand All @@ -202,7 +203,7 @@ public class RemoteLogManager implements Closeable {
* @param brokerTopicStats BrokerTopicStats instance to update the respective metrics.
* @param metrics Metrics instance
*/
public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
public RemoteLogManager(KafkaConfig config,
int brokerId,
String logDir,
String clusterId,
Expand All @@ -211,7 +212,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
BiConsumer<TopicPartition, Long> updateRemoteLogStartOffset,
BrokerTopicStats brokerTopicStats,
Metrics metrics) throws IOException {
this.rlmConfig = rlmConfig;
this.config = config;
this.brokerId = brokerId;
this.logDir = logDir;
this.clusterId = clusterId;
Expand All @@ -226,7 +227,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
rlmCopyQuotaManager = createRLMCopyQuotaManager();
rlmFetchQuotaManager = createRLMFetchQuotaManager();

indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
indexCache = new RemoteIndexCache(config.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether I have caught the context, so please feel free to correct me.

  1. IIRC, the dynamical configs are loaded by another thread, and hence we may NOT see the latest configs, which were updated dynamically, in creating RemoteLogManager, right?
  2. those configs (remote.log.manager.copy.max.bytes.per.second, remote.log.manager.fetch.max.bytes.per.second) can be updated by reconfigure process, so it should be fine to initialize them with "stale" (static) configs after broker restart, right?

Copy link
Copy Markdown
Contributor Author

@kamalcph kamalcph Jun 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. IIRC, the dynamical configs are loaded by another thread, and hence we may NOT see the latest configs, which were updated dynamically, in creating RemoteLogManager, right?

No, KafkaServer/BrokerServer does config.dynamicConfig.initialize before creating the RemoteLogManager instance so the dynamic configs gets updated in the KafkaConfig object but not in the KafkaConfig.remoteLogManagerConfig().

I have tested the patch only with ZooKeeper. I think the behavior should be similar for KRaftMetadataCache/ConfigRepository.

  1. those configs (remote.log.manager.copy.max.bytes.per.second, remote.log.manager.fetch.max.bytes.per.second) can be updated by reconfigure process, so it should be fine to initialize them with "stale" (static) configs after broker restart, right?

This is correct, the reconfigure updates the dynamic value but we are referring to the static value as explained above.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested the patch only with ZooKeeper. I think the behavior should be similar for KRaftMetadataCache/ConfigRepository.

That is a good point, and maybe they do have something difference.

zkClientOpt.foreach { zkClient =>

in kraft, zkClientOpt is none so it does not update it with dynamical parts.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I confirmed in KRaft, it won't have this issue.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I confirmed in KRaft, it won't have this issue.

Sorry that I'm not sure which issue you confirmed. If we are taking about dynamic configs in starting. According to above comments, it seems to me this fix which tries to return latest (dynamic) configs works well only if kafka is in zk. In kraft, this fix is no-op as it still return static configs.

Please correct me If I'm lost

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 , yes, you're right! To KRaft, this fix is no-op.

delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());

Expand Down Expand Up @@ -274,27 +276,29 @@ Duration quotaTimeout() {
}

RLMQuotaManager createRLMCopyQuotaManager() {
return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$,
return new RLMQuotaManager(copyQuotaManagerConfig(config), metrics, QuotaType.RLMCopy$.MODULE$,
"Tracking copy byte-rate for Remote Log Manager", time);
}

RLMQuotaManager createRLMFetchQuotaManager() {
return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMFetch$.MODULE$,
return new RLMQuotaManager(fetchQuotaManagerConfig(config), metrics, QuotaType.RLMFetch$.MODULE$,
"Tracking fetch byte-rate for Remote Log Manager", time);
}

public boolean isRemoteLogFetchQuotaExceeded() {
return rlmFetchQuotaManager.isQuotaExceeded();
}

static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(),
static RLMQuotaManagerConfig copyQuotaManagerConfig(KafkaConfig config) {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return new RLMQuotaManagerConfig(config.remoteLogManagerCopyMaxBytesPerSecond(),
rlmConfig.remoteLogManagerCopyNumQuotaSamples(),
rlmConfig.remoteLogManagerCopyQuotaWindowSizeSeconds());
}

static RLMQuotaManagerConfig fetchQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerFetchMaxBytesPerSecond(),
static RLMQuotaManagerConfig fetchQuotaManagerConfig(KafkaConfig config) {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return new RLMQuotaManagerConfig(config.remoteLogManagerFetchMaxBytesPerSecond(),
rlmConfig.remoteLogManagerFetchNumQuotaSamples(),
rlmConfig.remoteLogManagerFetchQuotaWindowSizeSeconds());
}
Expand All @@ -311,6 +315,7 @@ private <T> T createDelegate(ClassLoader classLoader, String className) {

@SuppressWarnings("removal")
RemoteStorageManager createRemoteStorageManager() {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteStorageManager>() {
private final String classPath = rlmConfig.remoteStorageManagerClassPath();

Expand All @@ -327,13 +332,14 @@ public RemoteStorageManager run() {
}

private void configureRSM() {
final Map<String, Object> rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps());
final Map<String, Object> rsmProps = new HashMap<>(config.remoteLogManagerConfig().remoteStorageManagerProps());
rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
remoteLogStorageManager.configure(rsmProps);
}

@SuppressWarnings("removal")
RemoteLogMetadataManager createRemoteLogMetadataManager() {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteLogMetadataManager>() {
private final String classPath = rlmConfig.remoteLogMetadataManagerClassPath();

Expand All @@ -360,7 +366,7 @@ private void configureRLMM() {
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name);
});
// update the remoteLogMetadataProps here to override endpoint config if any
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());
rlmmProps.putAll(config.remoteLogManagerConfig().remoteLogMetadataManagerProps());

rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
rlmmProps.put(LOG_DIR_CONFIG, logDir);
Expand Down Expand Up @@ -412,7 +418,7 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader,
Map<String, Uuid> topicIds) {
LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);

if (this.rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) {
if (config.remoteLogManagerConfig().isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unrelated to this PR, but config.remoteLogManagerConfig always return the same config even though the config is updated dynamically. That seems to be error-prone, since it means the "updated" config can return "remoteLogManagerConfig" with "previous" configs.

Copy link
Copy Markdown
Contributor Author

@kamalcph kamalcph Jun 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is the main issue. We access the remote configurations using KafkaConfig.remoteLogManagerConfig.xyz() but it reflects to the value in the static server.properties file not the dynamically updated one.

So, changed the usages to KafkaConfig.xyz() to get the dynamically updated value.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have great RemoteLogManagerConfig for remote storage, could we avoid moving configs out of RemoteLogManagerConfig? Especially, we are trying to reduce the size of KafkaConfigs.

If this PR aims to fix it for zk mode, maybe we can make sure the remoteLogManagerConfig returned by config always has the latest configs. For example:

  1. config.remoteLogManagerConfig always create new remoteLogManagerConfig based on currentConfig
  2. KafkaConfig#updateCurrentConfig should update inner _remoteLogManagerConfig also

@kamalcph WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we take this refactoring as part of KAFKA-16976 ticket?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Good point. A similar issue is raised in the comment. We can fix the bug with the current approach in 3.8 and finish cleaning up the raised issues in a followup as part of KAFKA-16976

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph @satishd sounds good to me :)

throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled");
}

Expand Down Expand Up @@ -1742,9 +1748,10 @@ public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteL

void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
Consumer<RLMTask> convertToLeaderOrFollower) {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
RLMTask task = new RLMTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
RLMTask task = new RLMTask(topicIdPartition, rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
// set this upfront when it is getting initialized instead of doing it after scheduling.
convertToLeaderOrFollower.accept(task);
LOGGER.info("Created a new task: {} and getting scheduled", task);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ class BrokerServer(

protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP)

def remoteLogIndexFileCacheTotalSizeBytes: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)

def remoteLogManagerCopyMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)

def remoteLogManagerFetchMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)

validateValues()

@nowarn("cat=deprecation")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ class KafkaServer(

protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>
Expand Down
Loading