diff --git a/.gitignore b/.gitignore index 3080a0d60b1d8..ce10edc5202b7 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,5 @@ jmh-benchmarks/src/main/generated raft/.jqwik-database **/src/generated **/src/generated-test + +storage/kafka-tiered-storage/ diff --git a/build.gradle b/build.gradle index b79cec202e09f..76f04ab4d5b77 100644 --- a/build.gradle +++ b/build.gradle @@ -820,6 +820,7 @@ project(':core') { implementation project(':server-common') implementation project(':metadata') + implementation project(':storage:api') implementation project(':raft') implementation project(':storage') @@ -852,6 +853,8 @@ project(':core') { testImplementation project(':clients').sourceSets.test.output testImplementation project(':metadata').sourceSets.test.output testImplementation project(':raft').sourceSets.test.output + testImplementation project(':storage:api').sourceSets.test.output + testImplementation libs.bcpkix testImplementation libs.mockitoCore testImplementation libs.easymock diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMovedToTieredStorageException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMovedToTieredStorageException.java new file mode 100644 index 0000000000000..6ea48c604d1bb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMovedToTieredStorageException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class OffsetMovedToTieredStorageException extends ApiException { + + private static final long serialVersionUID = 1L; + + public OffsetMovedToTieredStorageException(String message) { + super(message); + } + + public OffsetMovedToTieredStorageException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f48ae6c2332b4..5137ead3fff6d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -86,6 +86,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge; import org.apache.kafka.common.errors.OffsetNotAvailableException; import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.OffsetMovedToTieredStorageException; import org.apache.kafka.common.errors.OperationNotAttemptedException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.PolicyViolationException; @@ -364,7 +365,8 @@ public enum Errors { INCONSISTENT_TOPIC_ID(103, "The log's topic ID did not match the topic ID in the request", InconsistentTopicIdException::new), INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match that found on the server", InconsistentClusterIdException::new), TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new), - FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new); + FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new), + OFFSET_MOVED_TO_TIERED_STORAGE(107, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java new file mode 100644 index 0000000000000..c1d9af8f198c9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC; +import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; +import static org.apache.kafka.common.record.Records.MAGIC_OFFSET; +import static org.apache.kafka.common.record.Records.OFFSET_OFFSET; +import static org.apache.kafka.common.record.Records.SIZE_OFFSET; + +public class RemoteLogInputStream implements LogInputStream { + private final InputStream is; + private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC); + + public RemoteLogInputStream(InputStream is) { + this.is = is; + } + + @Override + public RecordBatch nextBatch() throws IOException { + logHeaderBuffer.rewind(); + Utils.readFully(is, logHeaderBuffer); + + if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC) + return null; + + logHeaderBuffer.rewind(); + logHeaderBuffer.getLong(OFFSET_OFFSET); + int size = logHeaderBuffer.getInt(SIZE_OFFSET); + + // V0 has the smallest overhead, stricter checking is done later + if (size < LegacyRecord.RECORD_OVERHEAD_V0) + throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " + + "overhead (%d).", size, LegacyRecord.RECORD_OVERHEAD_V0)); + + byte magic = logHeaderBuffer.get(MAGIC_OFFSET); + ByteBuffer buffer = ByteBuffer.allocate(size + LOG_OVERHEAD); + buffer.put(logHeaderBuffer); + buffer.position(logHeaderBuffer.limit()); + + Utils.readFully(is, buffer); + if (buffer.position() != size + LOG_OVERHEAD) + return null; + buffer.rewind(); + + MutableRecordBatch batch; + if (magic > RecordBatch.MAGIC_VALUE_V1) + batch = new DefaultRecordBatch(buffer); + else + batch = new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(buffer); + + return batch; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index 6b7734aca406c..efdc7da2afe1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -42,6 +42,11 @@ public class ListOffsetsRequest extends AbstractRequest { public static final long LATEST_TIMESTAMP = -1L; public static final long MAX_TIMESTAMP = -3L; + /** + * It is used to represent the earliest message stored in the local log which is also called the local-log-start-offset + */ + public static final long EARLIEST_LOCAL_TIMESTAMP = -4L; + public static final int CONSUMER_REPLICA_ID = -1; public static final int DEBUGGING_REPLICA_ID = -2; diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java b/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java new file mode 100644 index 0000000000000..85e865e098307 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.apache.kafka.common.KafkaException; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.Locale; +import java.util.NoSuchElementException; + +/** + * A class loader that looks for classes and resources in a specified class path first, before delegating to its parent + * class loader. + */ +public class ChildFirstClassLoader extends URLClassLoader { + static { + ClassLoader.registerAsParallelCapable(); + } + + /** + * @param classPath Class path string + * @param parent The parent classloader. If the required class / resource cannot be found in the given classPath, + * this classloader will be used to find the class / resource. + */ + public ChildFirstClassLoader(String classPath, ClassLoader parent) { + super(classpathToURLs(classPath), parent); + } + + static private URL[] classpathToURLs(String classPath) { + ArrayList urls = new ArrayList<>(); + for (String path : classPath.split(File.pathSeparator)) { + if (path == null || path.trim().isEmpty()) + continue; + File file = new File(path); + + try { + if (path.endsWith("/*")) { + File parent = new File(new File(file.getCanonicalPath()).getParent()); + if (parent.isDirectory()) { + File[] files = parent.listFiles((dir, name) -> { + String lower = name.toLowerCase(Locale.ROOT); + return lower.endsWith(".jar") || lower.endsWith(".zip"); + }); + if (files != null) { + for (File jarFile : files) { + urls.add(jarFile.getCanonicalFile().toURI().toURL()); + } + } + } + } else if (file.exists()) { + urls.add(file.getCanonicalFile().toURI().toURL()); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + return urls.toArray(new URL[0]); + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + synchronized (getClassLoadingLock(name)) { + Class c = findLoadedClass(name); + + if (c == null) { + try { + c = findClass(name); + } catch (ClassNotFoundException e) { + // Try parent + c = super.loadClass(name, false); + } + } + + if (resolve) + resolveClass(c); + + return c; + } + } + + @Override + public URL getResource(String name) { + URL url = findResource(name); + if (url == null) { + // try parent + url = super.getResource(name); + } + return url; + } + + @Override + public Enumeration getResources(String name) throws IOException { + Enumeration urls1 = findResources(name); + Enumeration urls2 = getParent() != null ? getParent().getResources(name) : null; + + return new Enumeration() { + @Override + public boolean hasMoreElements() { + return (urls1 != null && urls1.hasMoreElements()) || (urls2 != null && urls2.hasMoreElements()); + } + + @Override + public URL nextElement() { + if (urls1 != null && urls1.hasMoreElements()) + return urls1.nextElement(); + if (urls2 != null && urls2.hasMoreElements()) + return urls2.nextElement(); + throw new NoSuchElementException(); + } + }; + } +} diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json index df639579bd772..6ab975ca54531 100644 --- a/clients/src/main/resources/common/message/FetchRequest.json +++ b/clients/src/main/resources/common/message/FetchRequest.json @@ -48,7 +48,9 @@ // the `LastFetchedEpoch` field // // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. - "validVersions": "0-13", + // + // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) + "validVersions": "0-14", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json index 9ae28b7d6169b..7d144f0183164 100644 --- a/clients/src/main/resources/common/message/FetchResponse.json +++ b/clients/src/main/resources/common/message/FetchResponse.json @@ -41,7 +41,9 @@ // and leader discovery through the `CurrentLeader` field // // Version 13 replaces the topic name field with topic ID (KIP-516). - "validVersions": "0-13", + // + // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405) + "validVersions": "0-14", "flexibleVersions": "12+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json index 93c920ee2fe97..4e4d07ed49f93 100644 --- a/clients/src/main/resources/common/message/ListOffsetsRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json @@ -32,7 +32,9 @@ // Version 6 enables flexible versions. // // Version 7 enables listing offsets by max timestamp (KIP-734). - "validVersions": "0-7", + // + // Version 8 enables listing offsets by local log start offset (KIP-405). + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json b/clients/src/main/resources/common/message/ListOffsetsResponse.json index 6d6be0fdf4f59..25a94ae62dbbf 100644 --- a/clients/src/main/resources/common/message/ListOffsetsResponse.json +++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json @@ -31,7 +31,10 @@ // Version 6 enables flexible versions. // // Version 7 is the same as version 6 (KIP-734). - "validVersions": "0-7", + // + // Version 8 enables listing offsets by local timestamp. + // This is the ealiest log start offset i the local log. (KIP-405). + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index a0051784b4d73..6190270d772e7 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -18,6 +18,7 @@ package kafka.server.builders; import kafka.log.LogManager; +import kafka.log.remote.RemoteLogManager; import kafka.server.AlterIsrManager; import kafka.server.BrokerTopicStats; import kafka.server.DelayedDeleteRecords; @@ -53,6 +54,7 @@ public class ReplicaManagerBuilder { private AlterIsrManager alterIsrManager = null; private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); private AtomicBoolean isShuttingDown = new AtomicBoolean(false); + private Optional remoteLogManager = Optional.empty(); private Optional zkClient = Optional.empty(); private Optional> delayedProducePurgatory = Optional.empty(); private Optional> delayedFetchPurgatory = Optional.empty(); @@ -85,6 +87,11 @@ public ReplicaManagerBuilder setLogManager(LogManager logManager) { return this; } + public ReplicaManagerBuilder setRemoteLogManager(RemoteLogManager remoteLogManager) { + this.remoteLogManager = Optional.ofNullable(remoteLogManager); + return this; + } + public ReplicaManagerBuilder setQuotaManagers(QuotaManagers quotaManagers) { this.quotaManagers = quotaManagers; return this; @@ -157,6 +164,7 @@ public ReplicaManager build() { time, scheduler, logManager, + OptionConverters.toScala(remoteLogManager), quotaManagers, metadataCache, logDirFailureChannel, diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 8165e6c6f1e5a..ad316cd15e932 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -119,7 +119,12 @@ object ApiVersion { // Assume message format version is 3.0 (KIP-724) KAFKA_3_0_IV1, // Adds topic IDs to Fetch requests/responses (KIP-516) - KAFKA_3_1_IV0 + KAFKA_3_1_IV0, + // Introduce ListOffsets V8 that supports listing offsets by earliest local time stamp, + // which is local log start offset. (KIP-405) + // Introduced FetchRequest V14 is the same as version 13 but it also receives a new error called + // OffsetMovedToTieredStorageException(KIP-405) + KAFKA_3_2_IV0 ) // Map keys are the union of the short and full versions @@ -477,6 +482,13 @@ case object KAFKA_3_1_IV0 extends DefaultApiVersion { val id: Int = 35 } +case object KAFKA_3_2_IV0 extends DefaultApiVersion { + val shortVersion: String = "3.2" + val subVersion = "IV0" + val recordVersion = RecordVersion.V2 + val id: Int = 36 +} + object ApiVersionValidator extends Validator { override def ensureValid(name: String, value: Any): Unit = { diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 371e89555e54a..ee6d18e3c23ca 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -1172,7 +1172,7 @@ class Partition(val topicPartition: TopicPartition, case ListOffsetsRequest.LATEST_TIMESTAMP => maybeOffsetsError.map(e => throw e) .orElse(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch)))) - case ListOffsetsRequest.EARLIEST_TIMESTAMP => + case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP => getOffsetByTimestamp case _ => getOffsetByTimestamp.filter(timestampAndOffset => timestampAndOffset.offset < lastFetchableOffset) diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index 31b9f6d8dd71e..97bbfc8365270 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -17,26 +17,26 @@ package kafka.log -import java.io.{Closeable, File, RandomAccessFile} +import kafka.common.IndexOffsetOverflowException +import kafka.utils.CoreUtils.inLock +import kafka.utils.{CoreUtils, Logging} +import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem} + +import java.io.{File, RandomAccessFile} import java.nio.channels.FileChannel import java.nio.file.Files import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.concurrent.locks.{Lock, ReentrantLock} -import kafka.common.IndexOffsetOverflowException -import kafka.utils.CoreUtils.inLock -import kafka.utils.{CoreUtils, Logging} -import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils} - /** * The abstract index class which holds entry format agnostic methods. * - * @param _file The index file + * @param indexFile The index file * @param baseOffset the base offset of the segment that this index is corresponding to. * @param maxIndexSize The maximum index size in bytes. */ -abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1, - val writable: Boolean) extends Closeable { +abstract class AbstractIndex(@volatile private var indexFile: File, val baseOffset: Long, val maxIndexSize: Int = -1, + val writable: Boolean) extends CleanableIndex(indexFile) { import AbstractIndex._ // Length of the index file @@ -152,15 +152,13 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: */ def isFull: Boolean = _entries >= _maxEntries - def file: File = _file - def maxEntries: Int = _maxEntries def entries: Int = _entries def length: Long = _length - def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName) + def updateParentDir(parentDir: File): Unit = indexFile = new File(parentDir, file.getName) /** * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in @@ -201,16 +199,6 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: } } - /** - * Rename the file that backs this offset index - * - * @throws IOException if rename fails - */ - def renameTo(f: File): Unit = { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) - finally _file = f - } - /** * Flush the data in the index to disk */ diff --git a/core/src/main/scala/kafka/log/CleanableIndex.scala b/core/src/main/scala/kafka/log/CleanableIndex.scala new file mode 100644 index 0000000000000..a8d60c5e8cd9f --- /dev/null +++ b/core/src/main/scala/kafka/log/CleanableIndex.scala @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.{Closeable, File} +import java.nio.file.Path + +import org.apache.kafka.common.utils.Utils + +/** + * This class represents a common abstraction for operations like delete and rename of the index files. + * + * @param _file index file + * @see [[AbstractIndex]], [[OffsetIndex]], [[TimeIndex]], [[TransactionIndex]] + */ +abstract class CleanableIndex(@volatile var _file: File) extends Closeable { + + /** + * Rename the file that backs this offset index + * + * @throws IOException if rename fails + */ + def renameTo(toBeRenamedFile: File): Unit = { + try + if (_file.exists) Utils.atomicMoveWithFallback(_file.toPath, toBeRenamedFile.toPath, false) + finally _file = toBeRenamedFile + } + + def file: File = _file + + def path: Path = if (file.exists()) file.toPath else null + + def deleteIfExists(): Boolean +} diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index c4ee18c3bc14f..b6bc08fe17765 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -19,6 +19,7 @@ package kafka.log import kafka.api.ApiVersion import kafka.log.LogConfig.MessageFormatVersion +import kafka.log.remote.RemoteIndexCache import java.io._ import java.nio.file.Files @@ -354,7 +355,11 @@ class LogManager(logDirs: Seq[File], } val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir => - logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) + logDir.isDirectory && + UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic && + // Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem + // but not any topic-partition dir. + !logDir.getName.equals(RemoteIndexCache.DirName)) val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 5f5c22583fe01..81e9beab4071b 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -504,6 +504,12 @@ class ProducerStateManager(val topicPartition: TopicPartition, // completed transactions whose markers are at offsets above the high watermark private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata] + def reloadSnapshots(): Unit = { + info("Reloading the producer state snapshots") + truncateFullyAndStartAt(0L) + snapshots = loadSnapshots() + } + /** * Load producer state snapshots by scanning the _logDir. */ diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala index ca3d1bb31012d..8fe7b5cee7dab 100644 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -41,13 +41,14 @@ private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTx * order to find the start of the transactions. */ @nonthreadsafe -class TransactionIndex(val startOffset: Long, @volatile private var _file: File) extends Logging { +class TransactionIndex(val startOffset: Long, @volatile private var indexFile: File) extends CleanableIndex(indexFile) + with Logging { // note that the file is not created until we need it @volatile private var maybeChannel: Option[FileChannel] = None private var lastOffset: Option[Long] = None - if (_file.exists) + if (indexFile.exists) openChannel() def append(abortedTxn: AbortedTxn): Unit = { @@ -62,9 +63,7 @@ class TransactionIndex(val startOffset: Long, @volatile private var _file: File) def flush(): Unit = maybeChannel.foreach(_.force(true)) - def file: File = _file - - def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName) + def updateParentDir(parentDir: File): Unit = indexFile = new File(parentDir, file.getName) /** * Delete this index. @@ -106,13 +105,6 @@ class TransactionIndex(val startOffset: Long, @volatile private var _file: File) maybeChannel = None } - def renameTo(f: File): Unit = { - try { - if (file.exists) - Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) - } finally _file = f - } - def truncateTo(offset: Long): Unit = { val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize) var newLastOffset: Option[Long] = None diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 7332549e06954..0ca65e3892410 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -18,21 +18,18 @@ package kafka.log import com.yammer.metrics.core.MetricName - -import java.io.{File, IOException} -import java.nio.file.Files -import java.util.Optional -import java.util.concurrent.TimeUnit import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0} import kafka.common.{LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.AppendOrigin.RaftLeader +import kafka.log.remote.RemoteLogManager import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.metrics.KafkaMetricsGroup +import kafka.server._ import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.LeaderEpochFileCache -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal} import kafka.utils._ import org.apache.kafka.common.errors._ +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ @@ -41,11 +38,16 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_ import org.apache.kafka.common.requests.ProduceResponse.RecordError import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} +import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig +import java.io.{File, IOException} +import java.nio.file.Files +import java.util.Optional +import java.util.concurrent.TimeUnit import scala.annotation.nowarn -import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer import scala.collection.{Seq, immutable, mutable} +import scala.jdk.CollectionConverters._ object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(None, -1, None, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, @@ -248,6 +250,8 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { * is downgraded below 2.8, a topic ID may be lost and a new ID generated upon re-upgrade. * If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata * will be deleted to avoid ID conflicts upon re-upgrade. + * @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not. + * @param remoteLogManager Optional RemoteLogManager instance if it exists. */ @threadsafe class UnifiedLog(@volatile var logStartOffset: Long, @@ -257,7 +261,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, @volatile var leaderEpochCache: Option[LeaderEpochFileCache], val producerStateManager: ProducerStateManager, @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { + val keepPartitionMetadataFile: Boolean, + val remoteStorageSystemEnable: Boolean = false, + remoteLogManager: Option[RemoteLogManager] = None) extends Logging with KafkaMetricsGroup { import kafka.log.UnifiedLog._ @@ -288,6 +294,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, @volatile var partitionMetadataFile : PartitionMetadataFile = null + //todo-tier it needs to be updated. + private val localLogStartOffset: Long = logStartOffset + locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) @@ -295,6 +304,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, initializeTopicId() } + def remoteLogEnabled(): Boolean = { + // Remote logging is enabled only for non-compact and non-internal topics + remoteStorageSystemEnable && + !(config.compact || Topic.isInternal(topicPartition.topic()) + || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.eq(topicPartition.topic())) && + config.remoteLogConfig.remoteStorageEnable + } + /** * Initialize topic ID information for the log by maintaining the partition metadata file and setting the in-memory _topicId. * Delete partition metadata file if the version does not support topic IDs. @@ -550,6 +567,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, explicitMetricName(pkgStr, "Log", name, tags) } + def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { + rebuildProducerState(lastOffset, producerStateManager, reloadFromCleanShutdown) + maybeIncrementFirstUnstableOffset() + } + private def recordVersion: RecordVersion = config.recordVersion private def initializePartitionMetadata(): Unit = lock synchronized { @@ -608,10 +630,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be // free of all side-effects, i.e. it must not update any log-specific state. private def rebuildProducerState(lastOffset: Long, - producerStateManager: ProducerStateManager): Unit = lock synchronized { + producerStateManager: ProducerStateManager, + reloadFromCleanShutdown: Boolean = false): Unit = lock synchronized { localLog.checkIfMemoryMappedBufferClosed() UnifiedLog.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, recordVersion, time, - reloadFromCleanShutdown = false, logIdent) + reloadFromCleanShutdown, logIdent) } def activeProducers: Seq[DescribeProducersResponseData.ProducerState] = { @@ -1222,13 +1245,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (config.messageFormatVersion < KAFKA_0_10_0_IV0 && targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP && + targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP) throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " + s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " + s"required version $KAFKA_0_10_0_IV0") // For the earliest and latest, we do not need to return the timestamp. - if (targetTimestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) { + if (targetTimestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP || + (!remoteLogEnabled() && targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) { // The first cached epoch usually corresponds to the log start offset, but we have to verify this since // it may not be true following a message format version bump as the epoch will not be available for // log entries written in the older format. @@ -1238,6 +1263,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, case _ => Optional.empty[Integer]() } Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt)) + } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { + val earliestLocalLogEpochEntry = leaderEpochCache.flatMap(cache => + cache.epochForOffset(localLogStartOffset).flatMap(cache.getEpochEntry)) + val epochOpt = earliestLocalLogEpochEntry match { + case Some(entry) if entry.startOffset <= localLogStartOffset => Optional.of[Integer](entry.epoch) + case _ => Optional.empty[Integer]() + } + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, localLogStartOffset, epochOpt)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) @@ -1254,12 +1287,30 @@ class UnifiedLog(@volatile var logStartOffset: Long, latestTimestampAndOffset.offset, epochOptional)) } else { - // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides - // constant time access while being safe to use with concurrent collections unlike `toArray`. - val segmentsCopy = logSegments.toBuffer // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. - val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp) - targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset)) + val remoteOffset = if (remoteLogEnabled()) { + if (remoteLogManager.isEmpty) { + throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled."); + } + if (leaderEpochCache.isEmpty) { + throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.") + } + + remoteLogManager.get.findOffsetByTimestamp(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache.get) + } else None + + if (remoteOffset.nonEmpty) { + remoteOffset + } else { + // If it is not found in remote storage, search in the local storage starting with local log start offset. + + // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides + // constant time access while being safe to use with concurrent collections unlike `toArray`. + val segmentsCopy = logSegments.toBuffer + + val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp) + targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, localLogStartOffset)) + } } } } @@ -1287,6 +1338,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, startIndex = offsetTimeArray.length - 1 case ListOffsetsRequest.EARLIEST_TIMESTAMP => startIndex = 0 + case ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP => + startIndex = 0 case _ => var isFound = false debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) diff --git a/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala b/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala new file mode 100644 index 0000000000000..d35c70ed85ec3 --- /dev/null +++ b/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import org.apache.kafka.server.log.remote.storage.{LogSegmentData, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.InputStream +import java.util + +/** + * A wrapper class of RemoteStorageManager that sets the context class loader when calling RSM methods. + */ +class ClassLoaderAwareRemoteStorageManager(val rsm: RemoteStorageManager, + val rsmClassLoader: ClassLoader) extends RemoteStorageManager { + + def withClassLoader[T](fun: => T): T = { + val originalClassLoader = Thread.currentThread.getContextClassLoader + Thread.currentThread.setContextClassLoader(rsmClassLoader) + try { + fun + } finally { + Thread.currentThread.setContextClassLoader(originalClassLoader) + } + } + + def delegate(): RemoteStorageManager = { + rsm + } + + override def close(): Unit = withClassLoader { + rsm.close() + } + + override def configure(configs: util.Map[String, _]): Unit = withClassLoader { + rsm.configure(configs) + } + + override def copyLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, + logSegmentData: LogSegmentData): Unit = withClassLoader { + rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData) + } + + override def fetchLogSegment(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, + startPosition: Int): InputStream = withClassLoader { + rsm.fetchLogSegment(remoteLogSegmentMetadata, startPosition) + } + + override def fetchLogSegment(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, + startPosition: Int, + endPosition: Int): InputStream = withClassLoader { + rsm.fetchLogSegment(remoteLogSegmentMetadata, startPosition, endPosition) + } + + override def fetchIndex(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, + indexType: RemoteStorageManager.IndexType): InputStream = withClassLoader { + rsm.fetchIndex(remoteLogSegmentMetadata, indexType) + } + + override def deleteLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Unit = withClassLoader { + rsm.deleteLogSegmentData(remoteLogSegmentMetadata) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala new file mode 100644 index 0000000000000..a9b51b83bbb7b --- /dev/null +++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.log._ +import kafka.utils.{CoreUtils, Logging, ShutdownableThread} +import org.apache.kafka.common.errors.CorruptRecordException +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType +import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{File, InputStream} +import java.nio.file.{Files, Path} +import java.util +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +object RemoteIndexCache { + val DirName = "remote-log-index-cache" + val TmpFileSuffix = ".tmp" + val OffsetIndexFileSuffix = ".oi" + val TimeIndexFileSuffix = ".ti" + val TxnIndexFileSuffix = ".tx" +} + +class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) { + private val markedForCleanup = new AtomicBoolean(false) + + def lookupOffset(targetOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is already closed") + else offsetIndex.lookup(targetOffset) + } + + def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is already closed") + + val timestampOffset = timeIndex.lookup(timestamp) + offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset)) + } + + def markForCleanup(): Unit = { + if (!markedForCleanup.getAndSet(true)) { + Array(offsetIndex, timeIndex, txnIndex).foreach(x => + x.renameTo(new File(CoreUtils.replaceSuffix(x.file.getPath, "", UnifiedLog.DeletedFileSuffix)))) + } + } + + def cleanup(): Unit = { + markForCleanup() + CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists())) + } + + def close(): Unit = { + Array(offsetIndex, timeIndex, txnIndex).foreach(index => try { + index.close() + } catch { + case _: Exception => // ignore error. + }) + } +} + +/** + * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + * + * @param maxSize + * @param remoteStorageManager + * @param logDir + */ +//todo-tier make maxSize configurable +class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageManager, logDir: String) extends Logging { + + val cacheDir = new File(logDir, "remote-log-index-cache") + @volatile var closed = false + + val expiredIndexes = new LinkedBlockingQueue[Entry]() + val lock = new Object() + + val entries: util.Map[RemoteLogSegmentId, Entry] = new java.util.LinkedHashMap[RemoteLogSegmentId, Entry](maxSize / 2, + 0.75f, true) { + override def removeEldestEntry(eldest: util.Map.Entry[RemoteLogSegmentId, Entry]): Boolean = { + if (this.size() > maxSize) { + val entry = eldest.getValue + // Mark the entries for cleanup, background thread will clean them later. + entry.markForCleanup() + expiredIndexes.add(entry) + true + } else { + false + } + } + } + + private def init(): Unit = { + if (cacheDir.mkdir()) + info(s"Created $cacheDir successfully") + + // Delete any .deleted files remained from the earlier run of the broker. + Files.list(cacheDir.toPath).forEach((path: Path) => { + if (path.endsWith(UnifiedLog.DeletedFileSuffix)) { + Files.deleteIfExists(path) + } + }) + } + + init() + + // Start cleaner thread that will clean the expired entries + val cleanerThread: ShutdownableThread = new ShutdownableThread("remote-log-index-cleaner") { + setDaemon(true) + + override def doWork(): Unit = { + val entry = expiredIndexes.take() + info(s"Cleaning up index entry $entry") + try { + entry.cleanup() + } catch { + case ex: Exception => error("Error occurred while fetching/cleaning up expired entry", ex) + } + } + } + cleanerThread.start() + + def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry = { + def loadIndexFile[T <: CleanableIndex](fileName: String, + suffix: String, + fetchRemoteIndex: RemoteLogSegmentMetadata => InputStream, + readIndex: File => T): T = { + val indexFile = new File(cacheDir, fileName + suffix) + + def fetchAndCreateIndex(): T = { + val tmpIndexFile = new File(cacheDir, fileName + suffix + RemoteIndexCache.TmpFileSuffix) + + val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata) + try { + Files.copy(inputStream, tmpIndexFile.toPath) + } finally { + if (inputStream != null) { + inputStream.close() + } + } + + Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath) + readIndex(indexFile) + } + + if (indexFile.exists()) { + try { + readIndex(indexFile) + } catch { + case ex: CorruptRecordException => + info("Error occurred while loading the stored index", ex) + fetchAndCreateIndex() + } + } else { + fetchAndCreateIndex() + } + } + + lock synchronized { + entries.computeIfAbsent(remoteLogSegmentMetadata.remoteLogSegmentId(), (key: RemoteLogSegmentId) => { + val fileName = key.id().toString + val startOffset = remoteLogSegmentMetadata.startOffset() + + val offsetIndex: OffsetIndex = loadIndexFile(fileName, RemoteIndexCache.OffsetIndexFileSuffix, + rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET), + file => { + val index = new OffsetIndex(file, startOffset, Int.MaxValue, writable = false) + index.sanityCheck() + index + }) + + val timeIndex: TimeIndex = loadIndexFile(fileName, RemoteIndexCache.TimeIndexFileSuffix, + rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP), + file => { + val index = new TimeIndex(file, startOffset, Int.MaxValue, writable = false) + index.sanityCheck() + index + }) + + val txnIndex: TransactionIndex = loadIndexFile(fileName, RemoteIndexCache.TxnIndexFileSuffix, + rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION), + file => { + val index = new TransactionIndex(startOffset, file) + index.sanityCheck() + index + }) + + new Entry(offsetIndex, timeIndex, txnIndex) + }) + } + } + + def lookupOffset(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, offset: Long): Int = { + getIndexEntry(remoteLogSegmentMetadata).lookupOffset(offset).position + } + + def lookupTimestamp(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Int = { + getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position + } + + def close(): Unit = { + closed = true + cleanerThread.shutdown() + // Close all the opened indexes. + lock synchronized { + entries.values().stream().forEach(entry => entry.close()) + } + } + +} diff --git a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala new file mode 100644 index 0000000000000..af2b2a397263f --- /dev/null +++ b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.cluster.Partition +import kafka.metrics.KafkaMetricsGroup +import kafka.server.KafkaConfig +import kafka.server.checkpoints.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile} +import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} +import kafka.utils.Logging +import org.apache.kafka.common._ +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} +import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} +import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer +import org.apache.kafka.server.log.remote.metadata.storage.{ClassLoaderAwareRemoteLogMetadataManager, TopicBasedRemoteLogMetadataManagerConfig} +import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{BufferedWriter, ByteArrayOutputStream, Closeable, InputStream, OutputStreamWriter} +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.security.{AccessController, PrivilegedAction} +import java.util +import java.util.Optional +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import scala.collection.Set +import scala.jdk.CollectionConverters._ + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances. + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments. + * + * @param rlmConfig + * @param brokerId + * @param logDir + */ +class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, + brokerId: Int, + logDir: String) extends Logging with Closeable with KafkaMetricsGroup { + + // topic ids received on leadership changes + private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]() + + private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager() + private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager() + + private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir) + + private var closed = false + + private[remote] def createRemoteStorageManager(): RemoteStorageManager = { + def createDelegate(classLoader: ClassLoader): RemoteStorageManager = { + classLoader.loadClass(rlmConfig.remoteStorageManagerClassName()) + .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager] + } + + AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] { + private val classPath = rlmConfig.remoteStorageManagerClassPath() + + override def run(): RemoteStorageManager = { + if (classPath != null && classPath.trim.nonEmpty) { + val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader) + val delegate = createDelegate(classLoader) + new ClassLoaderAwareRemoteStorageManager(delegate, classLoader) + } else { + createDelegate(this.getClass.getClassLoader) + } + } + }) + } + + private def configureRSM(): Unit = { + val rsmProps = new util.HashMap[String, Any]() + rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) } + rsmProps.put(KafkaConfig.BrokerIdProp, brokerId) + remoteLogStorageManager.configure(rsmProps) + } + + private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = { + def createDelegate(classLoader: ClassLoader) = { + classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName()) + .getDeclaredConstructor() + .newInstance() + .asInstanceOf[RemoteLogMetadataManager] + } + + AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] { + private val classPath = rlmConfig.remoteLogMetadataManagerClassPath + + override def run(): RemoteLogMetadataManager = { + if (classPath != null && classPath.trim.nonEmpty) { + val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader) + val delegate = createDelegate(classLoader) + new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader) + } else { + createDelegate(this.getClass.getClassLoader) + } + } + }) + } + + private def configureRLMM(): Unit = { + val rlmmProps = new util.HashMap[String, Any]() + rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) => rlmmProps.put(k, v) } + rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId) + rlmmProps.put(KafkaConfig.LogDirProp, logDir) + remoteLogMetadataManager.configure(rlmmProps) + } + + def startup(): Unit = { + // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources + // in connecting to the brokers or remote storages. + configureRSM() + configureRLMM() + } + + def storageManager(): RemoteStorageManager = { + remoteLogStorageManager + } + + /** + * Callback to receive any leadership changes for the topic partitions assigned to this broker. If there are no + * existing tasks for a given topic partition then it will assign new leader or follower task else it will convert the + * task to respective target state(leader or follower). + * + * @param partitionsBecomeLeader partitions that have become leaders on this broker. + * @param partitionsBecomeFollower partitions that have become followers on this broker. + * @param topicIds topic name to topic id mappings. + */ + def onLeadershipChange(partitionsBecomeLeader: Set[Partition], + partitionsBecomeFollower: Set[Partition], + topicIds: util.Map[String, Uuid]): Unit = { + debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower") + + // Partitions logs are available when this callback is invoked. + // Compact topics and internal topics are filtered here as they are not supported with tiered storage. + def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = { + partitions.filterNot(partition => Topic.isInternal(partition.topic) || + partition.topicPartition.topic().equals(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME) || + partition.log.exists(log => log.remoteLogEnabled())).map(partition => + new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition)) + } + + val followerTopicPartitions = filterPartitions(partitionsBecomeFollower) + val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader) + debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " + + s"and followers: $followerTopicPartitions") + + if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) { + remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava, followerTopicPartitions.asJava) + } + } + + /** + * Stops partitions for copying segments, building indexes and deletes the partition in remote storage if delete flag + * is set as true. + * + * @param topicPartition topic partition to be stopped. + * @param delete flag to indicate whether the given topic partitions to be deleted or not. + */ + def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = { + if (delete) { + // Delete from internal datastructures only if it is to be deleted. + val topicIdPartition = topicPartitionIds.remove(topicPartition) + debug(s"Removed partition: $topicIdPartition from topicPartitionIds") + } + } + + def fetchRemoteLogSegmentMetadata(topicPartition: TopicPartition, + epochForOffset: Int, + offset: Long): Optional[RemoteLogSegmentMetadata] = { + val topicId = topicPartitionIds.get(topicPartition) + + if (topicId == null) { + throw new KafkaException("No topic id registered for topic partition: " + topicPartition) + } + + remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset) + } + + private def lookupTimestamp(rlsMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Option[TimestampAndOffset] = { + val startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset) + + var remoteSegInputStream: InputStream = null + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata, startPos) + val remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream) + var batch: RecordBatch = null + + def nextBatch(): RecordBatch = { + batch = remoteLogInputStream.nextBatch() + batch + } + + while (nextBatch() != null) { + if (batch.maxTimestamp >= timestamp && batch.lastOffset >= startingOffset) { + batch.iterator.asScala.foreach(record => { + if (record.timestamp >= timestamp && record.offset >= startingOffset) + return Some(new TimestampAndOffset(record.timestamp, record.offset, maybeLeaderEpoch(batch.partitionLeaderEpoch))) + }) + } + } + None + } finally { + Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream") + } + } + + private def maybeLeaderEpoch(leaderEpoch: Int): Optional[Integer] = { + if (leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) + Optional.empty() + else + Optional.of(leaderEpoch) + } + + /** + * Search the message offset in the remote storage based on timestamp and offset. + * + * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules: + * + * - If there is no messages in the remote storage, return None + * - If all the messages in the remote storage have smaller offsets, return None + * - If all the messages in the remote storage have smaller timestamps, return None + * - If all the messages in the remote storage have larger timestamps, or no message in the remote storage has a timestamp + * the returned offset will be max(the earliest offset in the remote storage, startingOffset) and the timestamp will + * be Message.NoTimestamp. + * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp + * is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset. + * + * @param timestamp The timestamp to search for. + * @param startingOffset The starting offset to search. + * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message. + */ + def findOffsetByTimestamp(tp: TopicPartition, + timestamp: Long, + startingOffset: Long, + leaderEpochCache: LeaderEpochFileCache): Option[TimestampAndOffset] = { + val topicId = topicPartitionIds.get(tp) + if (topicId == null) { + throw new KafkaException("Topic id does not exist for topic partition: " + tp) + } + // Get the respective epoch in which the starting offset exists. + var maybeEpoch = leaderEpochCache.epochForOffset(startingOffset); + while (maybeEpoch.nonEmpty) { + remoteLogMetadataManager.listRemoteLogSegments(new TopicIdPartition(topicId, tp), maybeEpoch.get).asScala + .foreach(rlsMetadata => + if (rlsMetadata.maxTimestampMs() >= timestamp && rlsMetadata.endOffset() >= startingOffset) { + val timestampOffset = lookupTimestamp(rlsMetadata, timestamp, startingOffset) + if (timestampOffset.isDefined) + return timestampOffset + } + ) + + // Move to the next epoch if not found with the current epoch. + maybeEpoch = leaderEpochCache.findNextEpoch(maybeEpoch.get) + } + None + } + + /** + * Returns the leader epoch checkpoint by truncating with the given start(exclusive) and end(inclusive) offset + * @param leaderEpochCache leader-epoch checkpoint cache. + * @param startOffset The start offset of the checkpoint file (exclusive in the truncation). + * If start offset is 6, then it will retain an entry at offset 6. + * @param endOffset The end offset of the checkpoint file (inclusive in the truncation) + * If end offset is 100, then it will remove the entries greater than or equal to 100. + * @return the truncated leader epoch checkpoint + */ + private[remote] def getLeaderEpochCheckpoint(leaderEpochCache: Option[LeaderEpochFileCache], startOffset: Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = { + val checkpoint = new InMemoryLeaderEpochCheckpoint() + leaderEpochCache + .map(cache => cache.writeTo(checkpoint)) + .foreach { x => + if (startOffset >= 0) { + x.truncateFromStart(startOffset) + } + x.truncateFromEnd(endOffset) + } + checkpoint + } + + class InMemoryLeaderEpochCheckpoint extends LeaderEpochCheckpoint { + private var epochs: Seq[EpochEntry] = Seq() + override def write(epochs: Iterable[EpochEntry]): Unit = this.epochs = epochs.toSeq + override def read(): Seq[EpochEntry] = this.epochs + + def readAsByteBuffer(): ByteBuffer = { + val stream = new ByteArrayOutputStream() + val writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8)) + val writeBuffer = new CheckpointWriteBuffer[EpochEntry](writer, 0, LeaderEpochCheckpointFile.Formatter) + try { + writeBuffer.write(epochs.asJava) + writer.flush() + ByteBuffer.wrap(stream.toByteArray) + } finally { + writer.close() + } + } + } + + /** + * Closes and releases all the resources like RemoterStorageManager and RemoteLogMetadataManager. + */ + def close(): Unit = { + this synchronized { + if (!closed) { + Utils.closeQuietly(remoteLogStorageManager, "RemoteLogStorageManager") + Utils.closeQuietly(remoteLogMetadataManager, "RemoteLogMetadataManager") + closed = true + } + } + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 492cec425e342..32249acc1e215 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -90,18 +90,24 @@ abstract class AbstractFetcherThread(name: String, protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] - protected def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] + protected def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] - protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long + protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) - protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long + protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) protected val isOffsetForLeaderEpochSupported: Boolean protected val isTruncationOnFetchSupported: Boolean + protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + fetchOffset: Long, + epochForFetchOffset: Int, + leaderLogStartOffset: Long): Unit + override def shutdown(): Unit = { initiateShutdown() inLock(partitionMapLock) { @@ -209,7 +215,7 @@ abstract class AbstractFetcherThread(name: String, * occur during truncation. */ private def truncateToEpochEndOffsets(latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit = { - val endOffsets = fetchEpochEndOffsets(latestEpochsForPartitions) + val endOffsets = fetchEpochEndOffsetsFromLeader(latestEpochsForPartitions) //Ensure we hold a lock during truncation. inLock(partitionMapLock) { //Check no leadership and no leader epoch changes happened whilst we were unlocked, fetching epochs @@ -396,7 +402,10 @@ abstract class AbstractFetcherThread(name: String, case Errors.OFFSET_OUT_OF_RANGE => if (handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) partitionsWithError += topicPartition - + case Errors.OFFSET_MOVED_TO_TIERED_STORAGE => + if (handleOffsetMovedToTieredStorage(topicPartition, currentFetchState, + fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset())) + partitionsWithError += topicPartition case Errors.UNKNOWN_LEADER_EPOCH => debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " + s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.") @@ -656,7 +665,10 @@ abstract class AbstractFetcherThread(name: String, /** * Handle a partition whose offset is out of range and return a new fetch offset. */ - protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { + private def fetchOffsetAndApplyFun(topicPartition: TopicPartition, + topicId: Option[Uuid], + currentLeaderEpoch: Int, + truncateAndBuild: => (Int, Long) => Unit) : PartitionFetchState = { val replicaEndOffset = logEndOffset(topicPartition) /** @@ -669,7 +681,7 @@ abstract class AbstractFetcherThread(name: String, * * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. */ - val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition, currentLeaderEpoch) + val (_, leaderEndOffset) = fetchLatestOffsetFromLeader(topicPartition, currentLeaderEpoch) if (leaderEndOffset < replicaEndOffset) { warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + s"leader's latest offset $leaderEndOffset") @@ -700,13 +712,13 @@ abstract class AbstractFetcherThread(name: String, * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset * and the current leader's log start offset. */ - val leaderStartOffset = fetchEarliestOffsetFromLeader(topicPartition, currentLeaderEpoch) + val (epoch, leaderStartOffset) = fetchEarliestOffsetFromLeader(topicPartition, currentLeaderEpoch) warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + s"leader's start offset $leaderStartOffset") val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset) // Only truncate log when current leader's log start offset is greater than follower's log end offset. if (leaderStartOffset > replicaEndOffset) - truncateFullyAndStartAt(topicPartition, leaderStartOffset) + truncateAndBuild(epoch, leaderStartOffset) val initialLag = leaderEndOffset - offsetToFetch fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag @@ -715,6 +727,58 @@ abstract class AbstractFetcherThread(name: String, } } + /** + * Handle a partition whose offset is out of range and return a new fetch offset. + */ + protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { + fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch, + (epoch, leaderLogStartOffset) => truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)) + } + + /** + * Handle a partition whose offset is moved to tiered storage and return a new fetch offset. + */ + protected def fetchOffsetAndBuildRemoteLogAuxState(topicPartition: TopicPartition, + topicId: Option[Uuid], + currentLeaderEpoch: Int, + leaderLogStartOffset: Long): PartitionFetchState = { + fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch, + (offsetEpoch, leaderLocalLogStartOffset) => + buildRemoteLogAuxState(topicPartition, currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset)) + } + + /** + * Handle the offset moved to tiered storage error. + * + * Return false if + * 1) it is able to build the required remote log auxiliary state or + * 2) was fenced and this thread haven't received new epoch, + * which means we need not backoff and retry. True if there was a retriable error. + */ + private def handleOffsetMovedToTieredStorage(topicPartition: TopicPartition, + fetchState: PartitionFetchState, + requestEpoch: Optional[Integer], + leaderLogStartOffset: Long): Boolean = { + try { + val newFetchState = fetchOffsetAndBuildRemoteLogAuxState(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch, leaderLogStartOffset) + partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) + info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + + s"moved to remote tier. Reset fetch offset to ${newFetchState.fetchOffset}") + false + } catch { + case _: FencedLeaderEpochException => + onPartitionFenced(topicPartition, requestEpoch) + case e @ (_ : UnknownTopicOrPartitionException | + _ : UnknownLeaderEpochException | + _ : NotLeaderOrFollowerException) => + info(s"Could not build remote log auxiliary state for $topicPartition due to error: ${e.getMessage}") + true + case e: Throwable => + error(s"Error building remote log auxiliary state for $topicPartition", e) + true + } + } + def delayPartitions(partitions: Iterable[TopicPartition], delay: Long): Unit = { partitionMapLock.lockInterruptibly() try { diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 08710308eebf3..98480d091e516 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -22,11 +22,11 @@ import java.util import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException} - import kafka.cluster.Broker.ServerInfo import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator} import kafka.log.LogManager +import kafka.log.remote.RemoteLogManager import kafka.metrics.KafkaYammerMetrics import kafka.network.SocketServer import kafka.raft.RaftManager @@ -42,12 +42,13 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} -import org.apache.kafka.common.{ClusterResource, Endpoint} +import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException} import org.apache.kafka.metadata.{BrokerState, VersionRange} import org.apache.kafka.raft.RaftConfig.AddressSpec import org.apache.kafka.raft.{RaftClient, RaftConfig} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.ApiMessageAndVersion +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.snapshot.SnapshotWriter import scala.collection.{Map, Seq} @@ -109,6 +110,7 @@ class BrokerServer( var logDirFailureChannel: LogDirFailureChannel = null var logManager: LogManager = null + var remoteLogManager: Option[RemoteLogManager] = None var tokenManager: DelegationTokenManager = null @@ -203,6 +205,8 @@ class BrokerServer( logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true) + remoteLogManager = createRemoteLogManager(config) + // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) @@ -263,6 +267,7 @@ class BrokerServer( time = time, scheduler = kafkaScheduler, logManager = logManager, + remoteLogManager = remoteLogManager, quotaManagers = quotaManagers, metadataCache = metadataCache, logDirFailureChannel = logDirFailureChannel, @@ -421,6 +426,9 @@ class BrokerServer( // Log static broker configurations. new KafkaConfig(config.originals(), true) + // Start RemoteLogManager before broker start serving the requests. + remoteLogManager.foreach(_.startup()) + // Enable inbound TCP connections. socketServer.startProcessingRequests(authorizerFutures) @@ -438,6 +446,19 @@ class BrokerServer( } } + protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = { + val remoteLogManagerConfig = new RemoteLogManagerConfig(config) + if (remoteLogManagerConfig.enableRemoteStorageSystem()) { + if(config.logDirs.size > 1) { + throw new KafkaException("Tiered storage is not supported with multiple log dirs."); + } + + Some(new RemoteLogManager(remoteLogManagerConfig, config.brokerId, config.logDirs.head)) + } else { + None + } + } + override def shutdown(): Unit = { if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return try { @@ -511,6 +532,11 @@ class BrokerServer( if (logManager != null) CoreUtils.swallow(logManager.shutdown(), this) + // Close remote log manager before stopping processing requests, to give a change to any + // of its underlying clients (especially in RemoteStorageManager and RemoteLogMetadataManager) + // to close gracefully. + CoreUtils.swallow(remoteLogManager.foreach(_.close()), this) + if (quotaManagers != null) CoreUtils.swallow(quotaManagers.shutdown(), this) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 416895e317ef7..d9182f03e8932 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -17,11 +17,6 @@ package kafka.server -import java.io.{File, IOException} -import java.net.{InetAddress, SocketTimeoutException} -import java.util.concurrent._ -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} - import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1} import kafka.cluster.{Broker, EndPoint} import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException} @@ -29,6 +24,7 @@ import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator} import kafka.log.LogManager +import kafka.log.remote.RemoteLogManager import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics} import kafka.network.{RequestChannel, SocketServer} import kafka.security.CredentialProvider @@ -47,11 +43,16 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} -import org.apache.kafka.common.{Endpoint, Node} +import org.apache.kafka.common.{Endpoint, KafkaException, Node} import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.zookeeper.client.ZKClientConfig +import java.io.{File, IOException} +import java.net.{InetAddress, SocketTimeoutException} +import java.util.concurrent._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ @@ -115,6 +116,7 @@ class KafkaServer( var logDirFailureChannel: LogDirFailureChannel = null @volatile private var _logManager: LogManager = null + var remoteLogManager: Option[RemoteLogManager] = None @volatile private var _replicaManager: ReplicaManager = null var adminManager: ZkAdminManager = null @@ -260,6 +262,8 @@ class KafkaServer( _brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) + remoteLogManager = createRemoteLogManager(config) + metadataCache = MetadataCache.zkMetadataCache(config.brokerId) // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. @@ -389,6 +393,9 @@ class KafkaServer( new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // Start RemoteLogManager before broker start serving the requests. + remoteLogManager.foreach(_.startup()) + /* start processing requests */ val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache) @@ -459,6 +466,19 @@ class KafkaServer( } } + protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = { + val remoteLogManagerConfig = new RemoteLogManagerConfig(config) + if (remoteLogManagerConfig.enableRemoteStorageSystem()) { + if(config.logDirs.size > 1) { + throw new KafkaException("Tiered storage is not supported with multiple log dirs."); + } + + Some(new RemoteLogManager(remoteLogManagerConfig, config.brokerId, config.logDirs.head)) + } else { + None + } + } + protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = { new ReplicaManager( metrics = metrics, @@ -466,6 +486,7 @@ class KafkaServer( time = time, scheduler = kafkaScheduler, logManager = logManager, + remoteLogManager = remoteLogManager, quotaManagers = quotaManagers, metadataCache = metadataCache, logDirFailureChannel = logDirFailureChannel, @@ -758,6 +779,11 @@ class KafkaServer( if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown(), this) + // Close remote log manager before stopping processing requests, to give a change to any + // of its underlying clients (especially in RemoteStorageManager and RemoteLogMetadataManager) + // to close gracefully. + CoreUtils.swallow(remoteLogManager.foreach(_.close()), this) + if (featureChangeListener != null) CoreUtils.swallow(featureChangeListener.close(), this) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 2ce33c838aac8..48818d5134cb5 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -19,16 +19,18 @@ package kafka.server import kafka.api.Request import kafka.cluster.BrokerEndPoint -import kafka.log.{LeaderOffsetIncremented, LogAppendInfo} +import kafka.log.{LeaderOffsetIncremented, LogAppendInfo, UnifiedLog} import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} import kafka.server.QuotaFactory.UnboundedQuota -import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} +import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition, Uuid} + import java.util import java.util.Optional import scala.collection.{Map, Seq, Set, mutable} @@ -159,22 +161,35 @@ class ReplicaAlterLogDirsThread(name: String, } } - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { + override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { val partition = replicaMgr.getPartitionOrException(topicPartition) - partition.localLogOrException.logStartOffset + val log = partition.localLogOrException + val offset = log.logStartOffset + doFetchEpochAndOffset(log, offset) } - override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { + override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { val partition = replicaMgr.getPartitionOrException(topicPartition) - partition.localLogOrException.logEndOffset + val log = partition.localLogOrException + val offset = log.logEndOffset + doFetchEpochAndOffset(log, offset) + } + + private def doFetchEpochAndOffset(log: UnifiedLog, offset: Long) = { + val leaderEpochFileCache = log.leaderEpochCache.getOrElse(throw new KafkaException("No leader epoch cache exists for partition: " + log.topicPartition)) + leaderEpochFileCache.epochForOffset(offset) match { + case Some(epoch) => (epoch, offset) + case None => (RecordBatch.NO_PARTITION_LEADER_EPOCH, offset) + } } /** * Fetches offset for leader epoch from local replica for each given topic partitions + * * @param partitions map of topic partition -> leader epoch of the future replica * @return map of topic partition -> end offset for a requested leader epoch */ - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + override def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { partitions.map { case (tp, epochData) => try { val endOffset = if (epochData.leaderEpoch == UNDEFINED_EPOCH) { @@ -311,4 +326,11 @@ class ReplicaAlterLogDirsThread(name: String, } } + override protected def buildRemoteLogAuxState(partition: TopicPartition, currentLeaderEpoch: Int, fetchOffset: Long, epochForFetchOffset: Int, leaderLogStartOffset: Long): Unit = { + // JBOD is not supported with tiered storage. + truncateFullyAndStartAt(partition, fetchOffset) + replicaMgr.futureLocalLogOrException(partition) + .maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented) + } + } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 57d89dc3d7e62..efcc80b8ed7fb 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,31 +17,34 @@ package kafka.server -import java.util.Collections -import java.util.Optional - import kafka.api._ import kafka.cluster.BrokerEndPoint -import kafka.log.{LeaderOffsetIncremented, LogAppendInfo} -import kafka.server.AbstractFetcherThread.ReplicaFetch -import kafka.server.AbstractFetcherThread.ResultWithPartitions +import kafka.log.{LeaderOffsetIncremented, LogAppendInfo, UnifiedLog} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.server.checkpoints.LeaderEpochCheckpointFile +import kafka.server.epoch.EpochEntry import kafka.utils.Implicits._ import org.apache.kafka.clients.FetchSessionHandler -import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} -import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic -import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.{LogContext, Time} - -import scala.jdk.CollectionConverters._ +import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} +import org.apache.kafka.server.common.CheckpointFile.CheckpointReadBuffer +import org.apache.kafka.server.log.remote.storage.{RemoteStorageException, RemoteStorageManager} + +import java.io.{BufferedReader, InputStream, InputStreamReader} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, StandardCopyOption} +import java.util.{Collections, Optional} import scala.collection.{Map, mutable} import scala.compat.java8.OptionConverters._ +import scala.jdk.CollectionConverters._ class ReplicaFetcherThread(name: String, fetcherId: Int, @@ -72,7 +75,8 @@ class ReplicaFetcherThread(name: String, // Visible for testing private[server] val fetchRequestVersion: Short = - if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_1_IV0) 13 + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_2_IV0) 14 + else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_1_IV0) 13 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_7_IV1) 12 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_3_IV1) 11 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV2) 10 @@ -95,7 +99,8 @@ class ReplicaFetcherThread(name: String, // Visible for testing private[server] val listOffsetRequestVersion: Short = - if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV1) 7 + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_2_IV0) 8 + else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV1) 7 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_2_IV1) 5 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4 @@ -224,8 +229,11 @@ class ReplicaFetcherThread(name: String, } val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) { - // If we had a session topic ID related error, throw it, otherwise return an empty fetch data map. - if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR) { + // If we had a session topic ID related error or unknown topic id or inconsistent topic id, throw it, + // otherwise return an empty fetch data map. + if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID || + fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR || + fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID) { throw Errors.forCode(fetchResponse.error().code()).exception() } else { Map.empty @@ -235,22 +243,25 @@ class ReplicaFetcherThread(name: String, } } - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { - fetchOffsetFromLeader(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP) + override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_1_IV0) + fetchOffsetFromLeader(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) + else + fetchOffsetFromLeader(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP) } - override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { fetchOffsetFromLeader(topicPartition, currentLeaderEpoch, ListOffsetsRequest.LATEST_TIMESTAMP) } - private def fetchOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int, earliestOrLatest: Long): Long = { + private def fetchOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int, earliestOrLatest: Long): (Int, Long) = { val topic = new ListOffsetsTopic() .setName(topicPartition.topic) .setPartitions(Collections.singletonList( - new ListOffsetsPartition() - .setPartitionIndex(topicPartition.partition) - .setCurrentLeaderEpoch(currentLeaderEpoch) - .setTimestamp(earliestOrLatest))) + new ListOffsetsPartition() + .setPartitionIndex(topicPartition.partition) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setTimestamp(earliestOrLatest))) val requestBuilder = ListOffsetsRequest.Builder.forReplica(listOffsetRequestVersion, replicaId) .setTargetTimes(Collections.singletonList(topic)) @@ -259,12 +270,12 @@ class ReplicaFetcherThread(name: String, val responsePartition = response.topics.asScala.find(_.name == topicPartition.topic).get .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get - Errors.forCode(responsePartition.errorCode) match { + Errors.forCode(responsePartition.errorCode) match { case Errors.NONE => if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) - responsePartition.offset + (responsePartition.leaderEpoch, responsePartition.offset ) else - responsePartition.oldStyleOffsets.get(0) + (responsePartition.leaderEpoch, responsePartition.oldStyleOffsets.get(0)) case error => throw error.exception } } @@ -340,7 +351,7 @@ class ReplicaFetcherThread(name: String, partition.truncateFullyAndStartAt(offset, isFuture = false) } - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + override def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { if (partitions.isEmpty) { debug("Skipping leaderEpoch request since all partitions do not have an epoch") @@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + /** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. + */ + override protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + leaderLocalLogStartOffset: Long, + epochForLeaderLocalLogStartOffset: Int, + leaderLogStartOffset: Long): Unit = { + + def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = { + val previousEpoch = epoch - 1 + // Find the end-offset for the epoch earlier to the given epoch from the leader + val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition()) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setLeaderEpoch(previousEpoch)) + val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition) + if (maybeEpochEndOffset.isEmpty) { + throw new KafkaException("No response received for partition: " + partition); + } + + val epochEndOffset = maybeEpochEndOffset.get + if (epochEndOffset.errorCode() != Errors.NONE.code()) { + throw Errors.forCode(epochEndOffset.errorCode()).exception() + } + + epochEndOffset + } + + replicaMgr.localLog(partition).foreach { log => + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { + replicaMgr.remoteLogManager.foreach { rlm => + + // Find the respective leader epoch for (leaderLogStartOffset - 1) + val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1 + val targetEpoch: Int = { + // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) + // will have the same epoch. + if(epochForLeaderLocalLogStartOffset == 0) { + epochForLeaderLocalLogStartOffset + } else { + // Fetch the earlier epoch/end-offset from the leader. + val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset) + // Check if the target offset lies with in the range of earlier epoch + if (earlierEpochEndOffset.endOffset >= highestOffsetInRemoteFromLeader) + earlierEpochEndOffset.leaderEpoch() // This gives the respective leader epoch, will handle any gaps in epochs + else epochForLeaderLocalLogStartOffset + } + } + + val rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, highestOffsetInRemoteFromLeader) + + if (rlsMetadata.isPresent) { + val epochStream = rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.LEADER_EPOCH) + val epochs = readLeaderEpochCheckpoint(epochStream) + + // Truncate the existing local log before restoring the leader epoch cache and producer snapshots. + truncateFullyAndStartAt(partition, leaderLocalLogStartOffset) + + log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented) + epochs.foreach { epochEntry => + log.leaderEpochCache.map(cache => cache.assign(epochEntry.epoch, epochEntry.startOffset)) + } + info(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " + + s"with size: ${epochs.size} for $partition") + + // Restore producer snapshot + val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, leaderLocalLogStartOffset) + Files.copy(rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), + snapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING) + log.producerStateManager.reloadSnapshots() + log.loadProducerState(leaderLocalLogStartOffset, reloadFromCleanShutdown = false) + info(s"Built the leader epoch cache and producer snapshots from remote tier for $partition. " + + s"Active producers: ${log.producerStateManager.activeProducers.size}, LeaderLogStartOffset: $leaderLogStartOffset") + } else { + throw new RemoteStorageException(s"Couldn't build the state from remote store for partition: $partition, " + + s"currentLeaderEpoch: $currentLeaderEpoch, leaderLocalLogStartOffset: $leaderLocalLogStartOffset, " + + s"leaderLogStartOffset: $leaderLogStartOffset, epoch: $targetEpoch as the previous remote log segment " + + s"metadata was not found") + } + } + } else { + // Truncate the existing local log and start from leader's localLogStartOffset. + truncateFullyAndStartAt(partition, leaderLocalLogStartOffset) + } + } + } + + private def readLeaderEpochCheckpoint(stream: InputStream): collection.Seq[EpochEntry] = { + val bufferedReader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)) + try { + val readBuffer = new CheckpointReadBuffer[EpochEntry]("", bufferedReader, 0, LeaderEpochCheckpointFile.Formatter) + readBuffer.read().asScala.toSeq + } finally { + bufferedReader.close() + } + } + } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9f8d9233ade44..f17da0cfecd78 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -27,6 +27,7 @@ import kafka.cluster.{BrokerEndPoint, Partition} import kafka.common.RecordValidationException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ +import kafka.log.remote.RemoteLogManager import kafka.metrics.KafkaMetricsGroup import kafka.server.{FetchMetadata => SFetchMetadata} import kafka.server.HostedPartition.Online @@ -187,6 +188,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, scheduler: Scheduler, val logManager: LogManager, + val remoteLogManager: Option[RemoteLogManager] = None, quotaManagers: QuotaManagers, val metadataCache: MetadataCache, logDirFailureChannel: LogDirFailureChannel, diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala index 7021c6742caea..3ac385d7ea5e0 100644 --- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala +++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala @@ -45,7 +45,7 @@ class CheckpointFileWithFailureHandler[T](val file: File, def read(): Seq[T] = { try { - checkpointFile.read().asScala + checkpointFile.read().asScala.toSeq } catch { case e: IOException => val msg = s"Error while reading checkpoint file ${file.getAbsolutePath}" diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index e6e45fd137425..5657bb8556d05 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -169,6 +169,24 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } } + def findPreviousEpoch(epoch: Int): Option[Int] = { + inReadLock(lock) { + Option(epochs.lowerEntry(epoch)).map(_.getKey) + } + } + + def findNextEpoch(epoch: Int): Option[Int] = { + inReadLock(lock) { + Option(epochs.higherEntry(epoch)).map(_.getKey) + } + } + + def getEpochEntry(epoch: Int): Option[EpochEntry] = { + inReadLock(lock) { + Option.apply(epochs.get(epoch)) + } + } + /** * Returns the Leader Epoch and the End Offset for a requested Leader Epoch. * @@ -268,6 +286,28 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } } + def epochForOffset(offset: Long): Option[Int] = { + inReadLock(lock) { + var previousEpoch = earliestEntry.map(_.epoch) + epochs.values().asScala.foreach { + case EpochEntry(epoch, startOffset) => + if (startOffset == offset) + return Some(epoch) + if (startOffset > offset) + return previousEpoch + previousEpoch = Some(epoch) + } + previousEpoch + } + } + + def writeTo(leaderEpochCheckpoint: LeaderEpochCheckpoint): LeaderEpochFileCache = { + inReadLock(lock) { + leaderEpochCheckpoint.write(epochEntries) + new LeaderEpochFileCache(this.topicPartition, leaderEpochCheckpoint) + } + } + /** * Delete all entries. */ diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala new file mode 100644 index 0000000000000..9fb275c0e0ad3 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import java.io.{File, FileInputStream} +import java.nio.file.Files +import java.util.Collections +import kafka.log.{OffsetIndex, OffsetPosition, TimeIndex} +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType +import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} +import org.easymock.EasyMock +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue + +import org.easymock.EasyMock.anyObject +import org.easymock.EasyMock.expect +import org.easymock.EasyMock.reset + +class RemoteIndexCacheTest { + + val rlsm: RemoteStorageManager = EasyMock.createMock(classOf[RemoteStorageManager]) + var rlsMetadata: RemoteLogSegmentMetadata = _ + var cache: RemoteIndexCache = _ + var offsetIndex: OffsetIndex = _ + var timeIndex: TimeIndex = _ + val maxEntries = 30 + val baseOffset = 45L + + @BeforeEach + def setup(): Unit = { + offsetIndex = new OffsetIndex(createTempFile(), baseOffset, maxIndexSize = maxEntries * 8) + timeIndex = new TimeIndex(createTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12) + + appendIndexEntries() + + // fetch indexes only once to build the cache, later it should be available in the cache + expect(rlsm.fetchIndex(anyObject(classOf[RemoteLogSegmentMetadata]), EasyMock.eq(IndexType.OFFSET))) + .andReturn(new FileInputStream(offsetIndex.file)) + .times(1) + expect(rlsm.fetchIndex(anyObject(classOf[RemoteLogSegmentMetadata]), EasyMock.eq(IndexType.TIMESTAMP))) + .andReturn(new FileInputStream(timeIndex.file)) + .times(1) + expect(rlsm.fetchIndex(anyObject(classOf[RemoteLogSegmentMetadata]), EasyMock.eq(IndexType.TRANSACTION))) + .andReturn(new FileInputStream(File.createTempFile("kafka-test-", ".txnIndex"))) + .times(1) + expect(rlsm.fetchIndex(anyObject(classOf[RemoteLogSegmentMetadata]), EasyMock.eq(IndexType.PRODUCER_SNAPSHOT))) + .andReturn(new FileInputStream(File.createTempFile("kafka-test-", ".snapshot"))) + .times(1) + + EasyMock.replay(rlsm) + + val logDir = Files.createTempDirectory("kafka-").toString + cache = new RemoteIndexCache(remoteStorageManager = rlsm, logDir = logDir) + + val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) + rlsMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), + baseOffset, offsetIndex.lastOffset, -1L, 1, 1024, 1024, + Collections.singletonMap(0, 0L)) + } + + private def appendIndexEntries(): Unit = { + val curTime = System.currentTimeMillis() + for (i <- 0 until offsetIndex.maxEntries) { + val offset = offsetIndex.baseOffset + i + 1 + offsetIndex.append(offset, i) + timeIndex.maybeAppend(curTime + i, offset, skipFullCheck = true) + } + + offsetIndex.flush() + timeIndex.flush() + } + + private def assertIndexEntries(): Unit = { + for (i <- 0 until offsetIndex.maxEntries) + assertEquals(OffsetPosition(offsetIndex.baseOffset + i + 1, i), offsetIndex.entry(i)) + assertTrue(timeIndex.entries > 0) + assertTrue(timeIndex.file.exists()) + } + + def createTempFile(): File = { + val file = File.createTempFile("kafka-test-", ".tmp") + Files.delete(file.toPath) + file + } + + @AfterEach + def cleanup(): Unit = { + reset(rlsm) + + if (offsetIndex != null) offsetIndex.deleteIfExists() + if (timeIndex != null) timeIndex.deleteIfExists() + cache.close() + } + + @Test + def testLoadingIndexFromRemoteStorage(): Unit = { + + assertIndexEntries() + + val offsetPosition1 = offsetIndex.entry(1) + // this call should have invoked fetchOffsetIndex, fetchTimestampIndex once + val resultOffset = cache.lookupOffset(rlsMetadata, offsetPosition1.offset) + assertEquals(offsetPosition1.position, resultOffset) + + // this should not cause fetching index from RemoteLogStorageManager as it is already fetched earlier + // this is checked by setting expectation times as 1 on the mock + val offsetPosition2 = offsetIndex.entry(2) + val resultOffset2 = cache.lookupOffset(rlsMetadata, offsetPosition2.offset) + assertEquals(offsetPosition2.position, resultOffset2) + } + + @Test + def testPositionForNonExistingIndexFromRemoteStorage(): Unit = { + // offsets beyond this + val lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset) + val greaterOffsetThanLastOffset = offsetIndex.lastOffset + 1 + val resultOffsetPosition1 = cache.lookupOffset(rlsMetadata, greaterOffsetThanLastOffset) + assertEquals(lastOffsetPosition, resultOffsetPosition1) + + val nonExistentOffsetPosition = OffsetPosition(baseOffset, 0) + val lowerOffsetThanBaseOffset = offsetIndex.baseOffset - 1 + val resultOffsetPosition2 = cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset) + assertEquals(nonExistentOffsetPosition.position, resultOffsetPosition2) + } +} diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala new file mode 100644 index 0000000000000..df520569171ba --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.server.checkpoints.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile} +import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.AbstractConfig +import org.apache.kafka.server.log.remote.storage._ +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Test + +import java.nio.file.Files +import java.util.Properties +import scala.collection.Seq + +class RemoteLogManagerTest { + + val topicPartition = new TopicPartition("test-topic", 0) + val logsDir: String = Files.createTempDirectory("kafka-").toString + val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { + private var epochs: Seq[EpochEntry] = Seq() + override def write(epochs: Iterable[EpochEntry]): Unit = this.epochs = epochs.toSeq + override def read(): Seq[EpochEntry] = this.epochs + } + val cache = new LeaderEpochFileCache(topicPartition, checkpoint) + + @Test + def testRLMConfig(): Unit = { + val key = "hello" + val rlmmConfigPrefix = "rlmm.config." + val props: Properties = new Properties() + props.put(rlmmConfigPrefix + key, "world") + props.put("remote.log.metadata.y", "z") + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, rlmmConfigPrefix) + val rlmConfig = createRLMConfig(props) + val rlmmConfig = rlmConfig.remoteLogMetadataManagerProps() + assertEquals(props.get(rlmmConfigPrefix + key), rlmmConfig.get(key)) + assertFalse(rlmmConfig.containsKey("remote.log.metadata.y")) + } + + @Test + def testGetLeaderEpochCheckpoint(): Unit = { + val epochs = Seq(EpochEntry(0, 33), EpochEntry(1, 43), EpochEntry(2, 99), EpochEntry(3, 105)) + epochs.foreach(epochEntry => cache.assign(epochEntry.epoch, epochEntry.startOffset)) + + val remoteLogManager = new RemoteLogManager(createRLMConfig(), brokerId = 1, logsDir) + + val maybeCache = Some(cache) + var actual = remoteLogManager.getLeaderEpochCheckpoint(maybeCache, startOffset = -1, endOffset = 200).read() + assertEquals(epochs.take(4), actual) + actual = remoteLogManager.getLeaderEpochCheckpoint(maybeCache, startOffset = -1, endOffset = 105).read() + assertEquals(epochs.take(3), actual) + actual = remoteLogManager.getLeaderEpochCheckpoint(maybeCache, startOffset = -1, endOffset = 100).read() + assertEquals(epochs.take(3), actual) + + actual = remoteLogManager.getLeaderEpochCheckpoint(maybeCache, startOffset = 34, endOffset = 200).read() + assertEquals(Seq(EpochEntry(0, 34)) ++ epochs.slice(1, 4), actual) + actual = remoteLogManager.getLeaderEpochCheckpoint(maybeCache, startOffset = 43, endOffset = 200).read() + assertEquals(epochs.slice(1, 4), actual) + + actual = remoteLogManager.getLeaderEpochCheckpoint(maybeCache, startOffset = 34, endOffset = 100).read() + assertEquals(Seq(EpochEntry(0, 34)) ++ epochs.slice(1, 3), actual) + actual = remoteLogManager.getLeaderEpochCheckpoint(maybeCache, startOffset = 34, endOffset = 30).read() + assertTrue(actual.isEmpty) + actual = remoteLogManager.getLeaderEpochCheckpoint(maybeCache, startOffset = 101, endOffset = 101).read() + assertTrue(actual.isEmpty) + actual = remoteLogManager.getLeaderEpochCheckpoint(maybeCache, startOffset = 101, endOffset = 102).read() + assertEquals(Seq(EpochEntry(2, 101)), actual) + } + + @Test + def testGetLeaderEpochCheckpointEncoding(): Unit = { + val epochs = Seq(EpochEntry(0, 33), EpochEntry(1, 43), EpochEntry(2, 99), EpochEntry(3, 105)) + epochs.foreach(epochEntry => cache.assign(epochEntry.epoch, epochEntry.startOffset)) + + val remoteLogManager = new RemoteLogManager(createRLMConfig(), brokerId = 1, logsDir) + + val tmpFile = TestUtils.tempFile() + val checkpoint = remoteLogManager.getLeaderEpochCheckpoint(Some(cache), startOffset = -1, endOffset = 200) + assertEquals(epochs, checkpoint.read()) + + Files.write(tmpFile.toPath, checkpoint.readAsByteBuffer().array()) + assertEquals(epochs, new LeaderEpochCheckpointFile(tmpFile).read()) + } + + private def createRLMConfig(props: Properties = new Properties): RemoteLogManagerConfig = { + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) + val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props) + new RemoteLogManagerConfig(config) + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 148a903187b1d..df67554f29d03 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -25,8 +25,7 @@ import kafka.cluster.BrokerEndPoint import kafka.log.LogAppendInfo import kafka.message.NoCompressionCodec import kafka.metrics.KafkaYammerMetrics -import kafka.server.AbstractFetcherThread.ReplicaFetch -import kafka.server.AbstractFetcherThread.ResultWithPartitions +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} import kafka.utils.Implicits.MapExtensionMethods import kafka.utils.TestUtils import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} @@ -392,7 +391,7 @@ class AbstractFetcherThreadTest { super.truncate(topicPartition, truncationState) } - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = + override def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = throw new UnsupportedOperationException override protected val isOffsetForLeaderEpochSupported: Boolean = false @@ -427,7 +426,7 @@ class AbstractFetcherThreadTest { super.truncate(topicPartition, truncationState) } - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = + override def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = throw new UnsupportedOperationException override def latestEpoch(topicPartition: TopicPartition): Option[Int] = None @@ -601,12 +600,90 @@ class AbstractFetcherThreadTest { assertEquals(0L, replicaState.highWatermark) } + @Test + def testFollowerFetchMovedToTieredStore(): Unit = { + val partition = new TopicPartition("topic", 0) + val fetcher = new MockFetcherThread() + + val replicaLog = Seq( + mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)), + mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)), + mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes))) + + val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 0L, rlmEnabled = true) + fetcher.setReplicaState(partition, replicaState) + fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5))) + + val leaderLog = Seq( + mkBatch(baseOffset = 5, leaderEpoch = 5, new SimpleRecord("f".getBytes)), + mkBatch(baseOffset = 6, leaderEpoch = 5, new SimpleRecord("g".getBytes)), + mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)), + mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes))) + + + val leaderState = MockFetcherThread.PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 8L, rlmEnabled = true) + // Overriding the log start offset to zero to mock the segment 0-4 moved to remote store. + leaderState.logStartOffset = 0 + fetcher.setLeaderState(partition, leaderState) + + assertEquals(3L, replicaState.logEndOffset) + val expectedState = if (truncateOnFetch) Option(Fetching) else Option(Truncating) + assertEquals(expectedState, fetcher.fetchState(partition).map(_.state)) + + fetcher.doWork() + // verify that the offset moved to tiered store error triggered and respective states are truncated to expected. + assertEquals(0L, replicaState.logStartOffset) + assertEquals(5L, replicaState.localLogStartOffset) + assertEquals(5L, replicaState.highWatermark) + assertEquals(5L, replicaState.logEndOffset) + + // Only 1 record batch is returned after a poll so calling 'n' number of times to get the desired result. + for (_ <- 1 to 4) fetcher.doWork() + assertEquals(4, replicaState.log.size) + assertEquals(0L, replicaState.logStartOffset) + assertEquals(5L, replicaState.localLogStartOffset) + assertEquals(8L, replicaState.highWatermark) + assertEquals(9L, replicaState.logEndOffset) + } + + @Test + def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = { + val partition = new TopicPartition("topic", 0) + var isErrorHandled = false + val fetcher = new MockFetcherThread() { + override protected def buildRemoteLogAuxState(topicPartition: TopicPartition, leaderEpoch: Int, fetchOffset: Long, epochForFetchOffset: Int, leaderLogStartOffset: Long): Unit = { + isErrorHandled = true + throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced") + } + } + + val replicaLog = Seq( + mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)), + mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes))) + val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 2L, rlmEnabled = true) + fetcher.setReplicaState(partition, replicaState) + fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = 5))) + + val leaderLog = Seq( + mkBatch(baseOffset = 5, leaderEpoch = 5, new SimpleRecord("b".getBytes)), + mkBatch(baseOffset = 6, leaderEpoch = 5, new SimpleRecord("c".getBytes))) + val leaderState = MockFetcherThread.PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 6L, rlmEnabled = true) + fetcher.setLeaderState(partition, leaderState) + + // After the offset moved to tiered storage error, we get a fenced error and remove the partition and mark as failed + fetcher.doWork() + assertEquals(3, replicaState.logEndOffset) + assertTrue(isErrorHandled) + assertTrue(fetcher.fetchState(partition).isEmpty) + assertTrue(failedPartitions.contains(partition)) + } + @Test def testFencedOffsetResetAfterOutOfRange(): Unit = { val partition = new TopicPartition("topic", 0) var fetchedEarliestOffset = false val fetcher = new MockFetcherThread() { - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { + override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { fetchedEarliestOffset = true throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced") } @@ -676,7 +753,7 @@ class AbstractFetcherThreadTest { val partition = new TopicPartition("topic", 0) val fetcher: MockFetcherThread = new MockFetcherThread { val tries = new AtomicInteger(0) - override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { + override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { if (tries.getAndIncrement() == 0) throw new UnknownLeaderEpochException("Unexpected leader epoch") super.fetchLatestOffsetFromLeader(topicPartition, leaderEpoch) @@ -770,8 +847,8 @@ class AbstractFetcherThreadTest { val fetcher = new MockFetcherThread { var fetchEpochsFromLeaderOnce = false - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { - val fetchedEpochs = super.fetchEpochEndOffsets(partitions) + override def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + val fetchedEpochs = super.fetchEpochEndOffsetsFromLeader(partitions) if (!fetchEpochsFromLeaderOnce) { // leader epoch changes while fetching epochs from leader removePartitions(Set(partition)) @@ -817,8 +894,8 @@ class AbstractFetcherThreadTest { val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1 val fetcher = new MockFetcherThread { - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { - val fetchedEpochs = super.fetchEpochEndOffsets(partitions) + override def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + val fetchedEpochs = super.fetchEpochEndOffsetsFromLeader(partitions) // leader epoch changes while fetching epochs from leader // at the same time, the replica fetcher manager removes the partition removePartitions(Set(partition)) @@ -855,9 +932,9 @@ class AbstractFetcherThreadTest { def testTruncationThrowsExceptionIfLeaderReturnsPartitionsNotRequestedInFetchEpochs(): Unit = { val partition = new TopicPartition("topic", 0) val fetcher = new MockFetcherThread { - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + override def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { val unrequestedTp = new TopicPartition("topic2", 0) - super.fetchEpochEndOffsets(partitions).toMap + (unrequestedTp -> new EpochEndOffset() + super.fetchEpochEndOffsetsFromLeader(partitions).toMap + (unrequestedTp -> new EpochEndOffset() .setPartition(unrequestedTp.partition) .setErrorCode(Errors.NONE.code) .setLeaderEpoch(0) @@ -999,13 +1076,19 @@ class AbstractFetcherThreadTest { var leaderEpoch: Int, var logStartOffset: Long, var logEndOffset: Long, - var highWatermark: Long) + var highWatermark: Long, + var rlmEnabled: Boolean, + var localLogStartOffset: Long) object PartitionState { - def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long): PartitionState = { + def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long, rlmEnabled: Boolean): PartitionState = { val logStartOffset = log.headOption.map(_.baseOffset).getOrElse(0L) val logEndOffset = log.lastOption.map(_.nextOffset).getOrElse(0L) - new PartitionState(log.toBuffer, leaderEpoch, logStartOffset, logEndOffset, highWatermark) + new PartitionState(log.toBuffer, leaderEpoch, logStartOffset, logEndOffset, highWatermark, rlmEnabled, logStartOffset) + } + + def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long): PartitionState = { + apply(log, leaderEpoch, highWatermark, rlmEnabled = false) } def apply(leaderEpoch: Int): PartitionState = { @@ -1123,7 +1206,11 @@ class AbstractFetcherThreadTest { override def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = { val state = replicaPartitionState(topicPartition) state.log.clear() - state.logStartOffset = offset + if (state.rlmEnabled) { + state.localLogStartOffset = offset + } else { + state.logStartOffset = offset + } state.logEndOffset = offset state.highWatermark = offset } @@ -1206,7 +1293,7 @@ class AbstractFetcherThreadTest { fetchOffset: Long, partitionState: PartitionState): Option[FetchResponseData.EpochEndOffset] = { lastFetchedEpoch.asScala.flatMap { fetchEpoch => - val epochEndOffset = fetchEpochEndOffsets( + val epochEndOffset = fetchEpochEndOffsetsFromLeader( Map(topicPartition -> new EpochData() .setPartition(topicPartition.partition) .setLeaderEpoch(fetchEpoch)))(topicPartition) @@ -1257,7 +1344,7 @@ class AbstractFetcherThreadTest { .setErrorCode(Errors.NONE.code) } - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + override def fetchEpochEndOffsetsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]() partitions.forKeyValue { (partition, epochData) => assert(partition.partition == epochData.partition, @@ -1281,9 +1368,13 @@ class AbstractFetcherThreadTest { val (error, records) = if (epochCheckError.isDefined) { (epochCheckError.get, MemoryRecords.EMPTY) - } else if (fetchData.fetchOffset > leaderState.logEndOffset || fetchData.fetchOffset < leaderState.logStartOffset) { + } else if (fetchData.fetchOffset > leaderState.logEndOffset || fetchData.fetchOffset < leaderState.localLogStartOffset) { + if (leaderState.rlmEnabled && fetchData.fetchOffset < leaderState.localLogStartOffset) { + (Errors.OFFSET_MOVED_TO_TIERED_STORAGE, MemoryRecords.EMPTY) + } else { (Errors.OFFSET_OUT_OF_RANGE, MemoryRecords.EMPTY) - } else if (divergingEpoch.nonEmpty) { + } + }else if (divergingEpoch.nonEmpty) { (Errors.NONE, MemoryRecords.EMPTY) } else { // for simplicity, we fetch only one batch at a time @@ -1319,16 +1410,22 @@ class AbstractFetcherThreadTest { } } - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { + override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { val leaderState = leaderPartitionState(topicPartition) checkLeaderEpochAndThrow(leaderEpoch, leaderState) - leaderState.logStartOffset + (leaderState.leaderEpoch, leaderState.localLogStartOffset) } - override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { + override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { val leaderState = leaderPartitionState(topicPartition) checkLeaderEpochAndThrow(leaderEpoch, leaderState) - leaderState.logEndOffset + (leaderState.leaderEpoch, leaderState.logEndOffset) + } + + override protected def buildRemoteLogAuxState(topicPartition: TopicPartition, currentLeaderEpoch: Int, fetchOffset: Long, epochForFetchOffset: Int, leaderLogStartOffset: Long): Unit = { + truncateFullyAndStartAt(topicPartition, fetchOffset) + replicaPartitionState(topicPartition).logStartOffset = leaderLogStartOffset + // skipped building leader epoch cache and producer snapshots as they are not verified. } } diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 1988ad6afcae1..654ffb5ae8d8b 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -63,7 +63,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, replicaRequest) assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, debugReplicaRequest) - val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers) + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 2) val replicas = zkClient.getReplicasForPartition(partition).toSet val leader = partitionToLeader(partition.partition) val follower = replicas.find(_ != leader).get @@ -112,7 +112,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { def testCurrentEpochValidation(): Unit = { val topic = "topic" val topicPartition = new TopicPartition(topic, 0) - val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3) val firstLeaderId = partitionToLeader(topicPartition.partition) // We need a leader change in order to check epoch fencing since the first epoch is 0 and @@ -171,7 +171,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { @Test def testResponseIncludesLeaderEpoch(): Unit = { - val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3) val firstLeaderId = partitionToLeader(partition.partition) TestUtils.generateAndProduceMessages(servers, topic, 9) @@ -179,6 +179,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1)) assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) + assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version = -1)) assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) @@ -191,6 +192,9 @@ class ListOffsetsRequestTest extends BaseRequestTest { val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, partition, servers) // No changes to written data + assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) + assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, -1)) + assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) @@ -204,7 +208,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { @Test def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = { - val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) + val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3) val firstLeaderId = partitionToLeader(partition.partition) TestUtils.generateAndProduceMessages(servers, topic, 9) @@ -214,18 +218,22 @@ class ListOffsetsRequestTest extends BaseRequestTest { if (version == 0) { assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort)) + assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort)) assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort)) } else if (version >= 1 && version <= 3) { assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort)) + assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort)) assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort)) } else if (version >= 4 && version <= 6) { assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort)) + assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort)) assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort)) } else if (version >= 7) { assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort)) + assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort)) assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort)) assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort)) } @@ -245,4 +253,8 @@ class ListOffsetsRequestTest extends BaseRequestTest { private def sendRequest(leaderId: Int, request: ListOffsetsRequest): ListOffsetsResponse = { connectAndReceive[ListOffsetsResponse](request, destination = brokerSocketServer(leaderId)) } + + def createTopic(numPartitions: Int, replicationFactor: Int): Map[Int, Int] = { + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers) + } } diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestWithRemoteStoreTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestWithRemoteStoreTest.scala new file mode 100644 index 0000000000000..fa7656932ad40 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestWithRemoteStoreTest.scala @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} + +import java.util.Properties + +class ListOffsetsRequestWithRemoteStoreTest extends ListOffsetsRequestTest { + + override def brokerPropertyOverrides(props: Properties): Unit = { + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) + } + + override def createTopic(numPartitions: Int, replicationFactor: Int): Map[Int, Int] = { + val props = new Properties() + props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers, props) + } +} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 80d69ecd5a8ba..0194a0ce62f0c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -341,7 +341,7 @@ class ReplicaAlterLogDirsThreadTest { quota = null, brokerTopicStats = null) - val result = thread.fetchEpochEndOffsets(Map( + val result = thread.fetchEpochEndOffsetsFromLeader(Map( t1p0 -> new OffsetForLeaderPartition() .setPartition(t1p0.partition) .setLeaderEpoch(leaderEpochT1p0), @@ -405,7 +405,7 @@ class ReplicaAlterLogDirsThreadTest { quota = null, brokerTopicStats = null) - val result = thread.fetchEpochEndOffsets(Map( + val result = thread.fetchEpochEndOffsetsFromLeader(Map( t1p0 -> new OffsetForLeaderPartition() .setPartition(t1p0.partition) .setLeaderEpoch(leaderEpoch), diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index bbd9330727a89..d92322a5b368b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -236,7 +236,7 @@ class ReplicaFetcherThreadTest { quota = null, leaderEndpointBlockingSend = Some(mockBlockingSend)) - val result = thread.fetchEpochEndOffsets(Map( + val result = thread.fetchEpochEndOffsetsFromLeader(Map( t1p0 -> new OffsetForLeaderPartition() .setPartition(t1p0.partition) .setLeaderEpoch(0), diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 1a4a82f6fefb9..58b3c4ee8a411 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -578,4 +578,15 @@ class LeaderEpochFileCacheTest { //Then cache.truncateFromEnd(7) } + + @Test + def testGetEpochEntry(): Unit = { + cache.assign(2, 100L) + cache.assign(3, 500L) + cache.assign(5, 1000L) + + assertEquals(EpochEntry(2, 100L), cache.getEpochEntry(2).get) + assertEquals(EpochEntry(3, 500L), cache.getEpochEntry(3).get) + assertEquals(EpochEntry(5, 1000L), cache.getEpochEntry(5).get) + } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 7f03788913722..442ea217a1334 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -96,6 +96,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import scala.Option; +import scala.Tuple2; import scala.collection.Iterator; import scala.collection.Map; @@ -348,12 +349,12 @@ public Option processPartitionData(TopicPartition topicPartition, } @Override - public long fetchEarliestOffsetFromLeader(TopicPartition topicPartition, int currentLeaderEpoch) { - return 0; + public Tuple2 fetchEarliestOffsetFromLeader(TopicPartition topicPartition, int currentLeaderEpoch) { + return Tuple2.apply(0, 0L); } @Override - public Map fetchEpochEndOffsets(Map partitions) { + public Map fetchEpochEndOffsetsFromLeader(Map partitions) { scala.collection.mutable.Map endOffsets = new scala.collection.mutable.HashMap<>(); Iterator iterator = partitions.keys().iterator(); while (iterator.hasNext()) { diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java index a1f708e3d67eb..a4cdd3a7ee2a8 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java @@ -77,29 +77,21 @@ public void write(Collection entries) throws IOException { // write to temp file and then swap with the existing file try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { - // Write the version - writer.write(Integer.toString(version)); - writer.newLine(); - - // Write the entries count - writer.write(Integer.toString(entries.size())); - writer.newLine(); - - // Write each entry on a new line. - for (T entry : entries) { - writer.write(formatter.toString(entry)); - writer.newLine(); + CheckpointWriteBuffer checkpointWriteBuffer = new CheckpointWriteBuffer(writer, version, formatter); + try { + checkpointWriteBuffer.write(entries); + writer.flush(); + // Call `sync` before the stream is closed. + fileOutputStream.getFD().sync(); + } finally { + writer.close(); } - - writer.flush(); - fileOutputStream.getFD().sync(); + Utils.atomicMoveWithFallback(tempPath, absolutePath); } - - Utils.atomicMoveWithFallback(tempPath, absolutePath); } } - public List read() throws IOException { + public Collection read() throws IOException { synchronized (lock) { try (BufferedReader reader = Files.newBufferedReader(absolutePath)) { CheckpointReadBuffer checkpointBuffer = new CheckpointReadBuffer<>(absolutePath.toString(), reader, version, formatter); @@ -108,24 +100,24 @@ public List read() throws IOException { } } - private static class CheckpointReadBuffer { + public static class CheckpointReadBuffer { private final String location; private final BufferedReader reader; private final int version; private final EntryFormatter formatter; - CheckpointReadBuffer(String location, - BufferedReader reader, - int version, - EntryFormatter formatter) { + public CheckpointReadBuffer(String location, + BufferedReader reader, + int version, + EntryFormatter formatter) { this.location = location; this.reader = reader; this.version = version; this.formatter = formatter; } - List read() throws IOException { + public List read() throws IOException { String line = reader.readLine(); if (line == null) return Collections.emptyList(); @@ -173,6 +165,36 @@ private IOException buildMalformedLineException(String line) { } } + public static class CheckpointWriteBuffer { + private final BufferedWriter writer; + private final int version; + private final EntryFormatter formatter; + + public CheckpointWriteBuffer(BufferedWriter writer, + int version, + EntryFormatter formatter) { + this.writer = writer; + this.version = version; + this.formatter = formatter; + } + + public void write(Collection entries) throws IOException { + // Write the version + writer.write(Integer.toString(version)); + writer.newLine(); + + // Write the entries count + writer.write(Integer.toString(entries.size())); + writer.newLine(); + + // Write each entry on a new line. + for (T entry : entries) { + writer.write(formatter.toString(entry)); + writer.newLine(); + } + } + } + /** * This is used to convert the given entry of type {@code T} into a string and vice versa. * diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java new file mode 100644 index 0000000000000..900d5bd5c695b --- /dev/null +++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +public class NoOpRemoteLogMetadataManager implements RemoteLogMetadataManager { + @Override + public CompletableFuture addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { + return null; + } + + @Override + public CompletableFuture updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) { + return null; + } + + @Override + public Optional remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, + int epochForOffset, + long offset) { + return Optional.empty(); + } + + @Override + public Optional highestOffsetForEpoch(TopicIdPartition topicIdPartition, int leaderEpoch) { + return Optional.empty(); + } + + @Override + public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) { + return null; + } + + @Override + public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition) { + return Collections.emptyIterator(); + } + + @Override + public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition, + int leaderEpoch) { + return Collections.emptyIterator(); + } + + @Override + public void onPartitionLeadershipChanges(Set leaderPartitions, + Set followerPartitions) { + } + + @Override + public void onStopPartitions(Set partitions) { + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + } +} diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java new file mode 100644 index 0000000000000..8a83033aa0444 --- /dev/null +++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Map; + +public class NoOpRemoteStorageManager implements RemoteStorageManager { + @Override + public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, + LogSegmentData logSegmentData) { + } + + @Override + public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, + int startPosition) { + return new ByteArrayInputStream(new byte[0]); + } + + @Override + public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, + int startPosition, + int endPosition) { + return new ByteArrayInputStream(new byte[0]); + } + + @Override + public InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata, + IndexType indexType) { + return new ByteArrayInputStream(new byte[0]); + } + + @Override + public void deleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { + } + + @Override + public void close() { + } + + @Override + public void configure(Map configs) { + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java new file mode 100644 index 0000000000000..e716e7349abd8 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetadataManager { + private final RemoteLogMetadataManager delegate; + private final ClassLoader loader; + + public ClassLoaderAwareRemoteLogMetadataManager(RemoteLogMetadataManager delegate, + ClassLoader loader) { + this.delegate = delegate; + this.loader = loader; + } + + @Override + public CompletableFuture addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { + return withClassLoader(() -> delegate.addRemoteLogSegmentMetadata(remoteLogSegmentMetadata)); + } + + @Override + public CompletableFuture updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException { + return withClassLoader(() -> delegate.updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate)); + } + + @Override + public Optional remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, + int epochForOffset, + long offset) throws RemoteStorageException { + return withClassLoader(() -> delegate.remoteLogSegmentMetadata(topicIdPartition, epochForOffset, offset)); + } + + @Override + public Optional highestOffsetForEpoch(TopicIdPartition topicIdPartition, + int leaderEpoch) throws RemoteStorageException { + return withClassLoader(() -> delegate.highestOffsetForEpoch(topicIdPartition, leaderEpoch)); + } + + @Override + public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException { + return withClassLoader(() -> delegate.putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata) + ); + } + + @Override + public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException { + return withClassLoader(() -> delegate.listRemoteLogSegments(topicIdPartition)); + } + + @Override + public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition, + int leaderEpoch) throws RemoteStorageException { + return withClassLoader(() -> delegate.listRemoteLogSegments(topicIdPartition, leaderEpoch)); + } + + @Override + public void onPartitionLeadershipChanges(Set leaderPartitions, + Set followerPartitions) { + withTryCatchClassLoader(() -> { + delegate.onPartitionLeadershipChanges(leaderPartitions, followerPartitions); + return null; + }); + } + + @Override + public void onStopPartitions(Set partitions) { + withTryCatchClassLoader(() -> { + delegate.onStopPartitions(partitions); + return null; + }); + } + + @Override + public void configure(Map configs) { + withTryCatchClassLoader(() -> { + delegate.configure(configs); + return null; + }); + } + + @Override + public void close() throws IOException { + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(loader); + try { + delegate.close(); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + + @SuppressWarnings("UnusedReturnValue") + private T withTryCatchClassLoader(Worker worker) { + try { + return withClassLoader(worker); + } catch (final RemoteStorageException ex) { + // ignore, this exception is not thrown by the method. + } + return null; + } + + private T withClassLoader(Worker worker) throws RemoteStorageException { + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(loader); + try { + return worker.doWork(); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + + @FunctionalInterface + public interface Worker { + T doWork() throws RemoteStorageException; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java index 1eddc0b788b16..bf77ef55385a4 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java @@ -22,8 +22,8 @@ import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.regex.Pattern; @@ -72,7 +72,7 @@ public synchronized void writeEntries(Map committedOffsets) throw } public synchronized Map readEntries() throws IOException { - List> entries = checkpointFile.read(); + Collection> entries = checkpointFile.read(); Map partitionToOffsets = new HashMap<>(entries.size()); for (Map.Entry entry : entries) { Long existingValue = partitionToOffsets.put(entry.getKey(), entry.getValue());