Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.snapshot.SnapshotReader

import java.util.concurrent.atomic.AtomicBoolean

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment applies to this code:

        try {
          delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
        } catch {
          case e: Throwable => snapshotName match {
            case None => metadataLoadingFaultHandler.handleFault(
              s"Error replaying metadata log record at offset ${_highestOffset}", e)
            case Some(name) => metadataLoadingFaultHandler.handleFault(
              s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e)
          }
        }

I think this code attempts to read and replay the entire committed log. I wonder if this code should be more conservative if it encounters an error replaying a record and only read the current batch before updating the image.

Note that this code is used for both snapshots and log segments. For snapshots, the entire snapshot needs to be in one delta update.


object BrokerMetadataListener {
val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs"
Expand All @@ -41,8 +43,22 @@ class BrokerMetadataListener(
val maxBytesBetweenSnapshots: Long,
val snapshotter: Option[MetadataSnapshotter],
brokerMetrics: BrokerServerMetrics,
metadataLoadingFaultHandler: FaultHandler
_metadataLoadingFaultHandler: FaultHandler
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to this diff but while reviewing this code, I noticed that we don't handle or count errors when generating a MetadataImage. Are these errors fatal? If these errors are fatal shouldn't errors when applying the image and delta to the rest of the broker (log manager, replica manager, etc.) also be fatal? cc @niket-goel

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The errors handled by this FaultHandler are non-fatal. AFAIK only the active controller will experience fatal metadata errors. For BrokerMetadataListener, we just log an ERROR and increment the metrics.

We call this fault handler in three cases in BrokerMetadataListener:

  • Replaying a record within a committed batch
  • Replaying a record within a batch from a snapshot
  • Finalizing a snapshot

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failures in this code seems to be fatal given the latest code:

private def publish(publisher: MetadataPublisher): Unit = {
val delta = _delta
_image = _delta.apply()
_delta = new MetadataDelta(_image)
if (isDebugEnabled) {
debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
}
publisher.publish(delta, _image)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see what you mean. It does stand to reason that an error when handling a record would lead to an error when building the MetadataImage itself. Seems like we could cover the delta.apply errors with the same non-fatal fault handler.

Copy link
Copy Markdown
Contributor

@cmccabe cmccabe Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the apply errors should be handled by metadataLoadingFaultHandler as well. If apply fails we obviously cannot apply the update at all, so we should have a catch block with something like throw metadataLoadingFaultHandler.handleFault("delta apply error", ...)

) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {

private val metadataFaultOccurred = new AtomicBoolean(false)
private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
// If the broker has any kind of error handling metadata records or publishing a new image
// we will disable taking new snapshots in order to preserve the local metadata log. Once we
// encounter a metadata processing error, the broker will be in an undetermined state.
if (metadataFaultOccurred.compareAndSet(false, true)) {
error("Disabling metadata snapshots until this broker is restarted.")
}
_metadataLoadingFaultHandler.handleFault(failureMessage, cause)
}
}

private val logContext = new LogContext(s"[BrokerMetadataListener id=$brokerId] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
logIdent = logContext.logPrefix()
Expand Down Expand Up @@ -147,7 +163,9 @@ class BrokerMetadataListener(

private def maybeStartSnapshot(): Unit = {
snapshotter.foreach { snapshotter =>
if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
if (metadataFaultOccurred.get()) {
trace("Not starting metadata snapshot since we previously had an error")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with TRACE here to avoid spamming the logs with this message on each metadata commit

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I think this should be WARN or ERROR to draw our attention to it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that in an emergency scenario when things are going badly, we could lose important logs due to log rotation if we're spamming a bunch of the same error/warn messages.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. That's fair.

I guess it's OK like it is for now. I am going to refactor this as part of the effort to unify Image handling between broker + controller.

} else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
_bytesSinceLastSnapshot = 0L
}
}
Expand Down Expand Up @@ -307,11 +325,21 @@ class BrokerMetadataListener(

private def publish(publisher: MetadataPublisher): Unit = {
val delta = _delta
_image = _delta.apply()
try {
_image = _delta.apply()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it is possible for _delta to include a lot of batches maybe even the entire log. I wonder that if the broker encounters an error applying a delta we want to instead rewind, generate and apply a delta per record batch.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewinding and re-applying does sound useful for some kind of automatic error mitigation, but I think it will be a quite a bit of work. As it stands, I believe the broker can only process metadata going forward.

I can think of a degenerate case we have today where loadBatches is able to process all but one record, but delta.apply cannot complete and so we can't publish any new metadata. Like you mention, I think the only way to mitigate a situation like this would be to produce smaller deltas to reduce the blast radius of a bad record.

} catch {
Comment thread
mumrah marked this conversation as resolved.
case t: Throwable =>
// If we cannot apply the delta, this publish event will throw and we will not publish a new image.
// The broker will continue applying metadata records and attempt to publish again.
throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
}

_delta = new MetadataDelta(_image)
if (isDebugEnabled) {
debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
}

// This publish call is done with its own try-catch and fault handler
publisher.publish(delta, _image)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? If there was an error in _image = _delta.apply(), _image will be the previous image that was published while delta is the new _delta that was not applied. Also, note that I am pretty sure that the code publisher.publish assumes that this layer doesn't send duplicate deltas and images.

Is there a way we can write tests for this code and scenarios so that we can increase our confidence that this code behaves as we expect?

Copy link
Copy Markdown
Member Author

@mumrah mumrah Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good catch. I missed a throw in the catch block above. If we can't apply the delta we should not publish the image.

I agree that more tests would be very useful as we harden this code path. I'll see what I can come up with for this PR and we can continue adding more tests after 3.3


// Update the metrics since the publisher handled the lastest image
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.kafka.common.{Endpoint, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.fault.MockFaultHandler
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, Test}

Expand All @@ -45,6 +45,7 @@ class BrokerMetadataListenerTest {
metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()),
snapshotter: Option[MetadataSnapshotter] = None,
maxBytesBetweenSnapshots: Long = 1000000L,
faultHandler: FaultHandler = metadataLoadingFaultHandler
): BrokerMetadataListener = {
new BrokerMetadataListener(
brokerId = 0,
Expand All @@ -53,7 +54,7 @@ class BrokerMetadataListenerTest {
maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
snapshotter = snapshotter,
brokerMetrics = metrics,
metadataLoadingFaultHandler = metadataLoadingFaultHandler
_metadataLoadingFaultHandler = faultHandler
)
}

Expand Down Expand Up @@ -168,6 +169,7 @@ class BrokerMetadataListenerTest {
}

private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")

private def generateManyRecords(listener: BrokerMetadataListener,
endOffset: Long): Unit = {
Expand All @@ -192,6 +194,27 @@ class BrokerMetadataListenerTest {
listener.getImageRecords().get()
}

private def generateBadRecords(listener: BrokerMetadataListener,
endOffset: Long): Unit = {
listener.handleCommit(
RecordTestUtils.mockBatchReader(
endOffset,
0,
util.Arrays.asList(
new ApiMessageAndVersion(new PartitionChangeRecord().
setPartitionId(0).
setTopicId(BAR_ID).
setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
new ApiMessageAndVersion(new PartitionChangeRecord().
setPartitionId(0).
setTopicId(BAR_ID).
setRemovingReplicas(Collections.emptyList()), 0.toShort)
)
)
)
listener.getImageRecords().get()
}

@Test
def testHandleCommitsWithNoSnapshotterDefined(): Unit = {
val listener = newBrokerMetadataListener(maxBytesBetweenSnapshots = 1000L)
Expand Down Expand Up @@ -289,6 +312,39 @@ class BrokerMetadataListenerTest {
assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should generate snapshot on feature update")
}

@Test
def testNoSnapshotAfterError(): Unit = {
val snapshotter = new MockMetadataSnapshotter()
val faultHandler = new MockFaultHandler("metadata loading")

val listener = newBrokerMetadataListener(
snapshotter = Some(snapshotter),
maxBytesBetweenSnapshots = 1000L,
faultHandler = faultHandler)
try {
val brokerIds = 0 to 3

registerBrokers(listener, brokerIds, endOffset = 100L)
createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L)
listener.getImageRecords().get()
assertEquals(200L, listener.highestMetadataOffset)
assertEquals(-1L, snapshotter.prevCommittedOffset)
assertEquals(-1L, snapshotter.activeSnapshotOffset)

// Append invalid records that will normally trigger a snapshot
generateBadRecords(listener, 1000L)
assertEquals(-1L, snapshotter.prevCommittedOffset)
assertEquals(-1L, snapshotter.activeSnapshotOffset)

// Generate some records that will not throw an error, verify still no snapshots
generateManyRecords(listener, 2000L)
assertEquals(-1L, snapshotter.prevCommittedOffset)
assertEquals(-1L, snapshotter.activeSnapshotOffset)
} finally {
listener.close()
}
}

private def registerBrokers(
listener: BrokerMetadataListener,
brokerIds: Iterable[Int],
Expand Down