KAFKA-14203 Don't make snapshots on broker after metadata errors#12596
KAFKA-14203 Don't make snapshots on broker after metadata errors#12596mumrah merged 5 commits intoapache:trunkfrom
Conversation
| snapshotter.foreach { snapshotter => | ||
| if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) { | ||
| if (metadataFaultOccurred.get()) { | ||
| trace("Not starting metadata snapshot since we previously had an error") |
There was a problem hiding this comment.
I went with TRACE here to avoid spamming the logs with this message on each metadata commit
There was a problem hiding this comment.
Honestly I think this should be WARN or ERROR to draw our attention to it.
There was a problem hiding this comment.
My concern is that in an emergency scenario when things are going badly, we could lose important logs due to log rotation if we're spamming a bunch of the same error/warn messages.
There was a problem hiding this comment.
Yeah. That's fair.
I guess it's OK like it is for now. I am going to refactor this as part of the effort to unify Image handling between broker + controller.
| override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = { | ||
| if (metadataFaultOccurred.compareAndSet(false, true)) { | ||
| error("Disabling metadata snapshots until this broker is restarted.") | ||
| } | ||
| _metadataLoadingFaultHandler.handleFault(failureMessage, cause) | ||
| } |
There was a problem hiding this comment.
This abstraction feels strange. For example, how does the operator monitor that Kafka has an issue and it is not generating snapshots? I assume that they need to monitor the metric for BrokerSeverMetrics.metadataLoadErrorCount which is updated from KafkaRaftServer. The disabling of snapshotting happens in BrokerMetadataListener which doesn't know about this metric.
I think the solution should make this relation explicit and not have it hidden or implemented across multiple layers of abstraction.
There was a problem hiding this comment.
Agree with @jsancio , or maybe we can just shutdown the broker with error, since the metadata cache in this broker is already wrong?
There was a problem hiding this comment.
I'm not really messing with the abstraction here, nor relying on any metrics. I've just wrapped the provided FaultHandler with some additional logic to set this new boolean. Since this fault handler is getting invoked for the metadata error handling cases, it seemed like a convenient choice.
Another way I considered was to add a method on FaultHandler to let a caller know if it had been invoked. Is this more like what you are suggesting?
or maybe we can just shutdown the broker with error
@showuon we currently just keep going in the face of metadata errors on the broker as a best effort to keep partition availability going. If we kill the broker on an error, we will likely see all the brokers crash. We are still working on a disaster recovery procedure, but killing the brokers would probably make that even harder. @cmccabe may have more context on this
There was a problem hiding this comment.
Please read https://cwiki.apache.org/confluence/display/KAFKA/KIP-859%3A+Add+Metadata+Log+Processing+Error+Related+Metrics for a description of how we're handling metadata errors.
Keep in mind that we're not trying to revisit that KIP here, just fixing a corner case (not creating snapshots after errors)
There was a problem hiding this comment.
Thanks for the info. I see. I have no more comments. Thank you
| val snapshotter: Option[MetadataSnapshotter], | ||
| brokerMetrics: BrokerServerMetrics, | ||
| metadataLoadingFaultHandler: FaultHandler | ||
| _metadataLoadingFaultHandler: FaultHandler |
There was a problem hiding this comment.
This is unrelated to this diff but while reviewing this code, I noticed that we don't handle or count errors when generating a MetadataImage. Are these errors fatal? If these errors are fatal shouldn't errors when applying the image and delta to the rest of the broker (log manager, replica manager, etc.) also be fatal? cc @niket-goel
There was a problem hiding this comment.
The errors handled by this FaultHandler are non-fatal. AFAIK only the active controller will experience fatal metadata errors. For BrokerMetadataListener, we just log an ERROR and increment the metrics.
We call this fault handler in three cases in BrokerMetadataListener:
- Replaying a record within a committed batch
- Replaying a record within a batch from a snapshot
- Finalizing a snapshot
There was a problem hiding this comment.
Failures in this code seems to be fatal given the latest code:
kafka/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
Lines 323 to 330 in cc18580
There was a problem hiding this comment.
Ok, I see what you mean. It does stand to reason that an error when handling a record would lead to an error when building the MetadataImage itself. Seems like we could cover the delta.apply errors with the same non-fatal fault handler.
There was a problem hiding this comment.
yes, the apply errors should be handled by metadataLoadingFaultHandler as well. If apply fails we obviously cannot apply the update at all, so we should have a catch block with something like throw metadataLoadingFaultHandler.handleFault("delta apply error", ...)
| } | ||
|
|
||
| // This publish call is done with its own try-catch and fault handler | ||
| publisher.publish(delta, _image) |
There was a problem hiding this comment.
Is this correct? If there was an error in _image = _delta.apply(), _image will be the previous image that was published while delta is the new _delta that was not applied. Also, note that I am pretty sure that the code publisher.publish assumes that this layer doesn't send duplicate deltas and images.
Is there a way we can write tests for this code and scenarios so that we can increase our confidence that this code behaves as we expect?
There was a problem hiding this comment.
Thanks, good catch. I missed a throw in the catch block above. If we can't apply the delta we should not publish the image.
I agree that more tests would be very useful as we harden this code path. I'll see what I can come up with for this PR and we can continue adding more tests after 3.3
| val delta = _delta | ||
| _image = _delta.apply() | ||
| try { | ||
| _image = _delta.apply() |
There was a problem hiding this comment.
Note that it is possible for _delta to include a lot of batches maybe even the entire log. I wonder that if the broker encounters an error applying a delta we want to instead rewind, generate and apply a delta per record batch.
There was a problem hiding this comment.
Rewinding and re-applying does sound useful for some kind of automatic error mitigation, but I think it will be a quite a bit of work. As it stands, I believe the broker can only process metadata going forward.
I can think of a degenerate case we have today where loadBatches is able to process all but one record, but delta.apply cannot complete and so we can't publish any new metadata. Like you mention, I think the only way to mitigate a situation like this would be to produce smaller deltas to reduce the blast radius of a bad record.
| import org.apache.kafka.snapshot.SnapshotReader | ||
|
|
||
| import java.util.concurrent.atomic.AtomicBoolean | ||
|
|
There was a problem hiding this comment.
This comment applies to this code:
try {
delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
} catch {
case e: Throwable => snapshotName match {
case None => metadataLoadingFaultHandler.handleFault(
s"Error replaying metadata log record at offset ${_highestOffset}", e)
case Some(name) => metadataLoadingFaultHandler.handleFault(
s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e)
}
}I think this code attempts to read and replay the entire committed log. I wonder if this code should be more conservative if it encounters an error replaying a record and only read the current batch before updating the image.
Note that this code is used for both snapshots and log segments. For snapshots, the entire snapshot needs to be in one delta update.
| _image = _delta.apply() | ||
| } catch { | ||
| case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) | ||
| case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) |
There was a problem hiding this comment.
What is your expectation if this throws? I ask this because at this point _delta and _image are not in a coherent or consistent state.
There was a problem hiding this comment.
I think _image and _delta will be unchanged if the apply call throws. The publish will raise an exception back to the StartPublishingEvent, HandleCommitEvent, or HandleSnapshotEvent. We should not see the _image get updated (unless of course a future snapshot could apply without error)
There was a problem hiding this comment.
@mumrah and I spoke offline about this. How about documenting the state of the broker at this point?
There was a problem hiding this comment.
Looking at the documentation for FaultHandler it looks like it is allowed for handleFault to return null.
There was a problem hiding this comment.
Hm yea, does look we can return null. In this case I believe we will see a NullPointerException. The EventQueue should still keep going in this case
…eptember 2022) `Jenkinsfile` was the only conflict and we ignore the changes since they are not relevant to the Confluent build. * apache-github/3.3: (61 commits) KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads. (apache#12628) KAFKA-14243: Temporarily disable unsafe downgrade (apache#12664) KAFKA-14240; Validate kraft snapshot state on startup (apache#12653) KAFKA-14233: disable testReloadUpdatedFilesWithoutConfigChange first to fix the build (apache#12658) KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot (apache#12655) KAFKA-14156: Built-in partitioner may create suboptimal batches (apache#12570) MINOR: Adds KRaft versions of most streams system tests (apache#12458) MINOR; Add missing li end tag (apache#12640) MINOR: Mention that kraft is production ready in upgrade notes (apache#12635) MINOR: Add upgrade note regarding the Strictly Uniform Sticky Partitioner (KIP-794) (apache#12630) KAFKA-14222; KRaft's memory pool should always allocate a buffer (apache#12625) KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (apache#12626) KAFKA-14196; Do not continue fetching partitions awaiting auto-commit prior to revocation (apache#12603) KAFKA-14215; Ensure forwarded requests are applied to broker request quota (apache#12624) MINOR; Remove end html tag from upgrade (apache#12605) Remove the html end tag from upgrade.html KAFKA-14205; Document how to replace the disk for the KRaft Controller (apache#12597) KAFKA-14203 Disable snapshot generation on broker after metadata errors (apache#12596) KAFKA-14216: Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback javadoc (apache#12617) KAFKA-14217: app-reset-tool.html should not show --zookeeper flag that no longer exists (apache#12618) ...
When we encounter an error while processing metadata on the broker, we have a "best effort" strategy (log the error and continue). One problem with this is that the broker's metadata state (in MetadataImage) is no longer consistent with the quorum. If we take snapshots of this inconsistent MetadataImage, we would eventually truncate the bad parts of the log and lose the data forever. By preventing snapshots, we preserve the log to allow for some (future) remediation.
This patch adds a simple boolean that gets set when we encounter a metadata error. The boolean can only be cleared by restarting the broker.