diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 3984f467eddab..cf7bf5aed9012 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -28,6 +28,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.fault.FaultHandler import org.apache.kafka.snapshot.SnapshotReader +import java.util.concurrent.atomic.AtomicBoolean + object BrokerMetadataListener { val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs" @@ -41,8 +43,22 @@ class BrokerMetadataListener( val maxBytesBetweenSnapshots: Long, val snapshotter: Option[MetadataSnapshotter], brokerMetrics: BrokerServerMetrics, - metadataLoadingFaultHandler: FaultHandler + _metadataLoadingFaultHandler: FaultHandler ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup { + + private val metadataFaultOccurred = new AtomicBoolean(false) + private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() { + override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = { + // If the broker has any kind of error handling metadata records or publishing a new image + // we will disable taking new snapshots in order to preserve the local metadata log. Once we + // encounter a metadata processing error, the broker will be in an undetermined state. + if (metadataFaultOccurred.compareAndSet(false, true)) { + error("Disabling metadata snapshots until this broker is restarted.") + } + _metadataLoadingFaultHandler.handleFault(failureMessage, cause) + } + } + private val logContext = new LogContext(s"[BrokerMetadataListener id=$brokerId] ") private val log = logContext.logger(classOf[BrokerMetadataListener]) logIdent = logContext.logPrefix() @@ -147,7 +163,9 @@ class BrokerMetadataListener( private def maybeStartSnapshot(): Unit = { snapshotter.foreach { snapshotter => - if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) { + if (metadataFaultOccurred.get()) { + trace("Not starting metadata snapshot since we previously had an error") + } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) { _bytesSinceLastSnapshot = 0L } } @@ -307,11 +325,21 @@ class BrokerMetadataListener( private def publish(publisher: MetadataPublisher): Unit = { val delta = _delta - _image = _delta.apply() + try { + _image = _delta.apply() + } catch { + case t: Throwable => + // If we cannot apply the delta, this publish event will throw and we will not publish a new image. + // The broker will continue applying metadata records and attempt to publish again. + throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) + } + _delta = new MetadataDelta(_image) if (isDebugEnabled) { debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.") } + + // This publish call is done with its own try-catch and fault handler publisher.publish(delta, _image) // Update the metrics since the publisher handled the lastest image diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala index 92e9fe9a2aeda..ec1ee666820ef 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.{Endpoint, Uuid} import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange} import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} -import org.apache.kafka.server.fault.MockFaultHandler +import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, Test} @@ -45,6 +45,7 @@ class BrokerMetadataListenerTest { metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()), snapshotter: Option[MetadataSnapshotter] = None, maxBytesBetweenSnapshots: Long = 1000000L, + faultHandler: FaultHandler = metadataLoadingFaultHandler ): BrokerMetadataListener = { new BrokerMetadataListener( brokerId = 0, @@ -53,7 +54,7 @@ class BrokerMetadataListenerTest { maxBytesBetweenSnapshots = maxBytesBetweenSnapshots, snapshotter = snapshotter, brokerMetrics = metrics, - metadataLoadingFaultHandler = metadataLoadingFaultHandler + _metadataLoadingFaultHandler = faultHandler ) } @@ -168,6 +169,7 @@ class BrokerMetadataListenerTest { } private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw") + private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg") private def generateManyRecords(listener: BrokerMetadataListener, endOffset: Long): Unit = { @@ -192,6 +194,27 @@ class BrokerMetadataListenerTest { listener.getImageRecords().get() } + private def generateBadRecords(listener: BrokerMetadataListener, + endOffset: Long): Unit = { + listener.handleCommit( + RecordTestUtils.mockBatchReader( + endOffset, + 0, + util.Arrays.asList( + new ApiMessageAndVersion(new PartitionChangeRecord(). + setPartitionId(0). + setTopicId(BAR_ID). + setRemovingReplicas(Collections.singletonList(1)), 0.toShort), + new ApiMessageAndVersion(new PartitionChangeRecord(). + setPartitionId(0). + setTopicId(BAR_ID). + setRemovingReplicas(Collections.emptyList()), 0.toShort) + ) + ) + ) + listener.getImageRecords().get() + } + @Test def testHandleCommitsWithNoSnapshotterDefined(): Unit = { val listener = newBrokerMetadataListener(maxBytesBetweenSnapshots = 1000L) @@ -289,6 +312,39 @@ class BrokerMetadataListenerTest { assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should generate snapshot on feature update") } + @Test + def testNoSnapshotAfterError(): Unit = { + val snapshotter = new MockMetadataSnapshotter() + val faultHandler = new MockFaultHandler("metadata loading") + + val listener = newBrokerMetadataListener( + snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L, + faultHandler = faultHandler) + try { + val brokerIds = 0 to 3 + + registerBrokers(listener, brokerIds, endOffset = 100L) + createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L) + listener.getImageRecords().get() + assertEquals(200L, listener.highestMetadataOffset) + assertEquals(-1L, snapshotter.prevCommittedOffset) + assertEquals(-1L, snapshotter.activeSnapshotOffset) + + // Append invalid records that will normally trigger a snapshot + generateBadRecords(listener, 1000L) + assertEquals(-1L, snapshotter.prevCommittedOffset) + assertEquals(-1L, snapshotter.activeSnapshotOffset) + + // Generate some records that will not throw an error, verify still no snapshots + generateManyRecords(listener, 2000L) + assertEquals(-1L, snapshotter.prevCommittedOffset) + assertEquals(-1L, snapshotter.activeSnapshotOffset) + } finally { + listener.close() + } + } + private def registerBrokers( listener: BrokerMetadataListener, brokerIds: Iterable[Int],