From 135ec6f008ce765b23928332e6b9d4536fb233f1 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 25 May 2021 13:47:56 +0200 Subject: [PATCH 1/2] MINOR: Log more information when producer snapshot is written --- core/src/main/scala/kafka/log/Log.scala | 6 +++++- core/src/main/scala/kafka/log/LogLoader.scala | 6 +++++- .../scala/kafka/log/ProducerStateManager.scala | 11 +++++++---- .../unit/kafka/log/LogCleanerManagerTest.scala | 2 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 2 +- .../scala/unit/kafka/log/LogLoaderTest.scala | 4 ++-- .../kafka/log/ProducerStateManagerTest.scala | 18 +++++++++--------- .../unit/kafka/server/ReplicaManagerTest.scala | 2 +- .../scala/unit/kafka/utils/SchedulerTest.scala | 2 +- 9 files changed, 32 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0a4851ba6842a..6ef95d1030f01 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2011,7 +2011,11 @@ object Log extends Logging { logDirFailureChannel, config.messageFormatVersion.recordVersion, s"[Log partition=$topicPartition, dir=${dir.getParent}] ") - val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) + val producerStateManager = new ProducerStateManager( + topicPartition, + dir, + maxProducerIdExpirationMs, + time) val offsets = LogLoader.load(LoadLogParams( dir, topicPartition, diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index bfadb78f7ab5a..ef9ba000d6ffb 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -324,7 +324,11 @@ object LogLoader extends Logging { * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow */ private def recoverSegment(segment: LogSegment, params: LoadLogParams): Int = { - val producerStateManager = new ProducerStateManager(params.topicPartition, params.dir, params.maxProducerIdExpirationMs) + val producerStateManager = new ProducerStateManager( + params.topicPartition, + params.dir, + params.maxProducerIdExpirationMs, + params.time) Log.rebuildProducerState( producerStateManager, params.segments, diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index ca1982bba276d..b4e7ac1597201 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -21,7 +21,6 @@ import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.{Files, StandardOpenOption} import java.util.concurrent.ConcurrentSkipListMap - import kafka.log.Log.offsetFromFile import kafka.server.LogOffsetMetadata import kafka.utils.{Logging, nonthreadsafe, threadsafe} @@ -29,6 +28,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch} +import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.{ByteUtils, Crc32C} import scala.jdk.CollectionConverters._ @@ -484,7 +484,8 @@ object ProducerStateManager { @nonthreadsafe class ProducerStateManager(val topicPartition: TopicPartition, @volatile var _logDir: File, - val maxProducerIdExpirationMs: Int = 60 * 60 * 1000) extends Logging { + val maxProducerIdExpirationMs: Int = 60 * 60 * 1000, + val time: Time = Time.SYSTEM) extends Logging { import ProducerStateManager._ import java.util @@ -718,8 +719,10 @@ class ProducerStateManager(val topicPartition: TopicPartition, // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, lastMapOffset)) - info(s"Writing producer snapshot at offset $lastMapOffset") + val start = time.hiResClockMs() writeSnapshot(snapshotFile.file, producers) + info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.") + snapshots.put(snapshotFile.offset, snapshotFile) // Update the last snap offset according to the serialized map @@ -730,7 +733,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * Update the parentDir for this ProducerStateManager and all of the snapshot files which it manages. */ - def updateParentDir(parentDir: File): Unit ={ + def updateParentDir(parentDir: File): Unit = { _logDir = parentDir snapshots.forEach((_, s) => s.updateParentDir(parentDir)) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index fbdbe207a1898..7620348264711 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -101,7 +101,7 @@ class LogCleanerManagerTest extends Logging { val maxProducerIdExpirationMs = 60 * 60 * 1000 val segments = new LogSegments(tp) val leaderEpochCache = Log.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "") - val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs) + val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs, time) val offsets = LogLoader.load(LoadLogParams( tpDir, tp, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index c137eab0a4194..0890b038c853b 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -106,7 +106,7 @@ class LogCleanerTest { val maxProducerIdExpirationMs = 60 * 60 * 1000 val logSegments = new LogSegments(topicPartition) val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "") - val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) + val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs, time) val offsets = LogLoader.load(LoadLogParams( dir, topicPartition, diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index d59ed1d0c4c63..4826784836303 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -95,7 +95,7 @@ class LogLoaderTest { val maxProducerIdExpirationMs = 60 * 60 * 1000 val segments = new LogSegments(topicPartition) val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "") - val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs) + val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, time) val loadLogParams = LoadLogParams(logDir, topicPartition, config, time.scheduler, time, logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint, maxProducerIdExpirationMs, leaderEpochCache, producerStateManager) @@ -265,7 +265,7 @@ class LogLoaderTest { } } val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "") - val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs) + val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, mockTime) val loadLogParams = LoadLogParams( logDir, topicPartition, diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 78510e388e8e5..5f81f59eddacf 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -46,7 +46,7 @@ class ProducerStateManagerTest { @BeforeEach def setUp(): Unit = { logDir = TestUtils.tempDir() - stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time) } @AfterEach @@ -467,7 +467,7 @@ class ProducerStateManagerTest { append(stateManager, producerId, epoch, 1, 1L, isTransactional = true) stateManager.takeSnapshot() - val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time) recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds) // The snapshot only persists the last appended batch metadata @@ -490,7 +490,7 @@ class ProducerStateManagerTest { appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.ABORT, offset = 2L) stateManager.takeSnapshot() - val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time) recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds) // The snapshot only persists the last appended batch metadata @@ -510,7 +510,7 @@ class ProducerStateManagerTest { offset = 0L, timestamp = appendTimestamp) stateManager.takeSnapshot() - val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time) recoveredMapping.truncateAndReload(logStartOffset = 0L, logEndOffset = 1L, time.milliseconds) val lastEntry = recoveredMapping.lastEntry(producerId) @@ -542,7 +542,7 @@ class ProducerStateManagerTest { append(stateManager, producerId, epoch, 1, 1L, 1) stateManager.takeSnapshot() - val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time) recoveredMapping.truncateAndReload(0L, 1L, 70000) // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence @@ -561,7 +561,7 @@ class ProducerStateManagerTest { append(stateManager, producerId, epoch, 1, 1L, 1) stateManager.takeSnapshot() - val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time) recoveredMapping.truncateAndReload(0L, 1L, 70000) val sequence = 2 @@ -769,7 +769,7 @@ class ProducerStateManagerTest { @Test def testSequenceNotValidatedForGroupMetadataTopic(): Unit = { val partition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) - val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time) val epoch = 0.toShort append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 99, @@ -818,7 +818,7 @@ class ProducerStateManagerTest { appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1) stateManager.takeSnapshot() - val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time) recoveredMapping.truncateAndReload(0L, 2L, 70000) // append from old coordinator should be rejected @@ -922,7 +922,7 @@ class ProducerStateManagerTest { } // Ensure that the truncated snapshot is deleted and producer state is loaded from the previous snapshot - val reloadedStateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + val reloadedStateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time) reloadedStateManager.truncateAndReload(0L, 20L, time.milliseconds()) assertFalse(snapshotToTruncate.exists()) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index d1d38601bda93..28f7be88c0426 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -1487,7 +1487,7 @@ class ReplicaManagerTest { val maxProducerIdExpirationMs = 30000 val segments = new LogSegments(tp) val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "") - val producerStateManager = new ProducerStateManager(tp, logDir, maxProducerIdExpirationMs) + val producerStateManager = new ProducerStateManager(tp, logDir, maxProducerIdExpirationMs, time) val offsets = LogLoader.load(LoadLogParams( logDir, tp, diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index 93362e25a9d8c..e5d2f2176c393 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -124,7 +124,7 @@ class SchedulerTest { val logDirFailureChannel = new LogDirFailureChannel(10) val segments = new LogSegments(topicPartition) val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "") - val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs) + val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, mockTime) val offsets = LogLoader.load(LoadLogParams( logDir, topicPartition, From 4acacfc57b598b86279cb95131600081481fc87f Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 25 May 2021 17:17:29 +0200 Subject: [PATCH 2/2] Address Ismael comment --- core/src/main/scala/kafka/log/Log.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6ef95d1030f01..76b2b7e7c6fd9 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2011,11 +2011,7 @@ object Log extends Logging { logDirFailureChannel, config.messageFormatVersion.recordVersion, s"[Log partition=$topicPartition, dir=${dir.getParent}] ") - val producerStateManager = new ProducerStateManager( - topicPartition, - dir, - maxProducerIdExpirationMs, - time) + val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs, time) val offsets = LogLoader.load(LoadLogParams( dir, topicPartition,