From cc185807591e54c7abb9af91fbb847ab9a9acc8a Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 6 Sep 2022 17:46:00 -0400 Subject: [PATCH 1/5] Don't make snapshots on broker after metadata errors --- .../metadata/BrokerMetadataListener.scala | 19 +++++++++++++++++-- .../metadata/BrokerMetadataListenerTest.scala | 2 +- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 3984f467eddab..44803b9fae0be 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -28,6 +28,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.fault.FaultHandler import org.apache.kafka.snapshot.SnapshotReader +import java.util.concurrent.atomic.AtomicBoolean + object BrokerMetadataListener { val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs" @@ -41,8 +43,19 @@ class BrokerMetadataListener( val maxBytesBetweenSnapshots: Long, val snapshotter: Option[MetadataSnapshotter], brokerMetrics: BrokerServerMetrics, - metadataLoadingFaultHandler: FaultHandler + _metadataLoadingFaultHandler: FaultHandler ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup { + + private val metadataFaultOccurred = new AtomicBoolean(false) + private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() { + override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = { + if (metadataFaultOccurred.compareAndSet(false, true)) { + error("Disabling metadata snapshots until this broker is restarted.") + } + _metadataLoadingFaultHandler.handleFault(failureMessage, cause) + } + } + private val logContext = new LogContext(s"[BrokerMetadataListener id=$brokerId] ") private val log = logContext.logger(classOf[BrokerMetadataListener]) logIdent = logContext.logPrefix() @@ -147,7 +160,9 @@ class BrokerMetadataListener( private def maybeStartSnapshot(): Unit = { snapshotter.foreach { snapshotter => - if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) { + if (metadataFaultOccurred.get()) { + trace("Not starting metadata snapshot since we previously had an error") + } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) { _bytesSinceLastSnapshot = 0L } } diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala index 92e9fe9a2aeda..3de80ba59db82 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala @@ -53,7 +53,7 @@ class BrokerMetadataListenerTest { maxBytesBetweenSnapshots = maxBytesBetweenSnapshots, snapshotter = snapshotter, brokerMetrics = metrics, - metadataLoadingFaultHandler = metadataLoadingFaultHandler + _metadataLoadingFaultHandler = metadataLoadingFaultHandler ) } From ad798ca64906b8e3822f5d51e2ccfaf2e4e03648 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 7 Sep 2022 14:14:45 -0400 Subject: [PATCH 2/5] Add a test and try/catch for delta apply --- .../metadata/BrokerMetadataListener.scala | 24 +++++--- .../metadata/BrokerMetadataListenerTest.scala | 60 ++++++++++++++++++- 2 files changed, 73 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 44803b9fae0be..1d3d134a5a0ca 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -322,16 +322,22 @@ class BrokerMetadataListener( private def publish(publisher: MetadataPublisher): Unit = { val delta = _delta - _image = _delta.apply() - _delta = new MetadataDelta(_image) - if (isDebugEnabled) { - debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.") - } - publisher.publish(delta, _image) + try { + _image = _delta.apply() + _delta = new MetadataDelta(_image) + if (isDebugEnabled) { + debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.") + } - // Update the metrics since the publisher handled the lastest image - brokerMetrics.lastAppliedRecordOffset.set(_highestOffset) - brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp) + // This publish call is done with its own try-catch and fault handler + publisher.publish(delta, _image) + + // Update the metrics since the publisher handled the lastest image + brokerMetrics.lastAppliedRecordOffset.set(_highestOffset) + brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp) + } catch { + case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) + } } override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala index 3de80ba59db82..1a5dc51b8aee2 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.{Endpoint, Uuid} import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange} import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} -import org.apache.kafka.server.fault.MockFaultHandler +import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, Test} @@ -45,6 +45,7 @@ class BrokerMetadataListenerTest { metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()), snapshotter: Option[MetadataSnapshotter] = None, maxBytesBetweenSnapshots: Long = 1000000L, + faultHandler: FaultHandler = metadataLoadingFaultHandler ): BrokerMetadataListener = { new BrokerMetadataListener( brokerId = 0, @@ -53,7 +54,7 @@ class BrokerMetadataListenerTest { maxBytesBetweenSnapshots = maxBytesBetweenSnapshots, snapshotter = snapshotter, brokerMetrics = metrics, - _metadataLoadingFaultHandler = metadataLoadingFaultHandler + _metadataLoadingFaultHandler = faultHandler ) } @@ -168,6 +169,7 @@ class BrokerMetadataListenerTest { } private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw") + private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg") private def generateManyRecords(listener: BrokerMetadataListener, endOffset: Long): Unit = { @@ -192,6 +194,27 @@ class BrokerMetadataListenerTest { listener.getImageRecords().get() } + private def generateBadRecords(listener: BrokerMetadataListener, + endOffset: Long): Unit = { + listener.handleCommit( + RecordTestUtils.mockBatchReader( + endOffset, + 0, + util.Arrays.asList( + new ApiMessageAndVersion(new PartitionChangeRecord(). + setPartitionId(0). + setTopicId(BAR_ID). + setRemovingReplicas(Collections.singletonList(1)), 0.toShort), + new ApiMessageAndVersion(new PartitionChangeRecord(). + setPartitionId(0). + setTopicId(BAR_ID). + setRemovingReplicas(Collections.emptyList()), 0.toShort) + ) + ) + ) + listener.getImageRecords().get() + } + @Test def testHandleCommitsWithNoSnapshotterDefined(): Unit = { val listener = newBrokerMetadataListener(maxBytesBetweenSnapshots = 1000L) @@ -289,6 +312,39 @@ class BrokerMetadataListenerTest { assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should generate snapshot on feature update") } + @Test + def testNoShapshotAfterError(): Unit = { + val snapshotter = new MockMetadataSnapshotter() + val faultHandler = new MockFaultHandler("metadata loading") + + val listener = newBrokerMetadataListener( + snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L, + faultHandler = faultHandler) + try { + val brokerIds = 0 to 3 + + registerBrokers(listener, brokerIds, endOffset = 100L) + createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L) + listener.getImageRecords().get() + assertEquals(200L, listener.highestMetadataOffset) + assertEquals(-1L, snapshotter.prevCommittedOffset) + assertEquals(-1L, snapshotter.activeSnapshotOffset) + + // Append invalid records that will normally trigger a snapshot + generateBadRecords(listener, 1000L) + assertEquals(-1L, snapshotter.prevCommittedOffset) + assertEquals(-1L, snapshotter.activeSnapshotOffset) + + // Generate some records that will not throw an error, verify still no snapshots + generateManyRecords(listener, 2000L) + assertEquals(-1L, snapshotter.prevCommittedOffset) + assertEquals(-1L, snapshotter.activeSnapshotOffset) + } finally { + listener.close() + } + } + private def registerBrokers( listener: BrokerMetadataListener, brokerIds: Iterable[Int], From 0ac599feed360e2993cff3d305109f86fd38f21f Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 7 Sep 2022 22:18:37 -0400 Subject: [PATCH 3/5] PR feedback --- .../metadata/BrokerMetadataListener.scala | 23 ++++++++++--------- .../metadata/BrokerMetadataListenerTest.scala | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 1d3d134a5a0ca..b069617250dc1 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -324,20 +324,21 @@ class BrokerMetadataListener( val delta = _delta try { _image = _delta.apply() - _delta = new MetadataDelta(_image) - if (isDebugEnabled) { - debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.") - } - - // This publish call is done with its own try-catch and fault handler - publisher.publish(delta, _image) - - // Update the metrics since the publisher handled the lastest image - brokerMetrics.lastAppliedRecordOffset.set(_highestOffset) - brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp) } catch { case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) } + + _delta = new MetadataDelta(_image) + if (isDebugEnabled) { + debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.") + } + + // This publish call is done with its own try-catch and fault handler + publisher.publish(delta, _image) + + // Update the metrics since the publisher handled the lastest image + brokerMetrics.lastAppliedRecordOffset.set(_highestOffset) + brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp) } override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala index 1a5dc51b8aee2..ec1ee666820ef 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala @@ -313,7 +313,7 @@ class BrokerMetadataListenerTest { } @Test - def testNoShapshotAfterError(): Unit = { + def testNoSnapshotAfterError(): Unit = { val snapshotter = new MockMetadataSnapshotter() val faultHandler = new MockFaultHandler("metadata loading") From e1dda63c46908295ecbc164386e1c96227cafa1b Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 8 Sep 2022 18:50:00 -0400 Subject: [PATCH 4/5] Throw if we can't apply a delta --- .../scala/kafka/server/metadata/BrokerMetadataListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index b069617250dc1..15ca73771803c 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -325,7 +325,7 @@ class BrokerMetadataListener( try { _image = _delta.apply() } catch { - case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) + case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) } _delta = new MetadataDelta(_image) From 3883451fd20a94927d814cf287eed79ddb485c31 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 12 Sep 2022 16:28:33 -0400 Subject: [PATCH 5/5] Clarify broker state after error --- .../kafka/server/metadata/BrokerMetadataListener.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 15ca73771803c..cf7bf5aed9012 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -49,6 +49,9 @@ class BrokerMetadataListener( private val metadataFaultOccurred = new AtomicBoolean(false) private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() { override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = { + // If the broker has any kind of error handling metadata records or publishing a new image + // we will disable taking new snapshots in order to preserve the local metadata log. Once we + // encounter a metadata processing error, the broker will be in an undetermined state. if (metadataFaultOccurred.compareAndSet(false, true)) { error("Disabling metadata snapshots until this broker is restarted.") } @@ -325,7 +328,10 @@ class BrokerMetadataListener( try { _image = _delta.apply() } catch { - case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) + case t: Throwable => + // If we cannot apply the delta, this publish event will throw and we will not publish a new image. + // The broker will continue applying metadata records and attempt to publish again. + throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) } _delta = new MetadataDelta(_image)