Skip to content

MINOR: Add RaftReplicaManager#10069

Merged
cmccabe merged 21 commits intoapache:trunkfrom
rondagostino:kip500_ReplicaManager4Raft
Feb 10, 2021
Merged

MINOR: Add RaftReplicaManager#10069
cmccabe merged 21 commits intoapache:trunkfrom
rondagostino:kip500_ReplicaManager4Raft

Conversation

@rondagostino
Copy link
Copy Markdown
Contributor

This adds the logic to apply partition metadata when consuming from the Raft-based metadata log.

RaftReplicaManager extends ReplicaManager for now to minimize changes to existing code for the 2.8 release. We will likely adjust this hierarchy at a later time (e.g. introducing a trait and adding a helper to refactor common code). For now, we expose the necessary fields and methods in ReplicaManager by changing their scope from private to protected, and we refactor out a couple of pieces of logic that are shared between the two implementation (stopping replicas and adding log dir fetchers).

Existing tests are sufficient to expose regressions in the current ReplicaManager.

We intend to exercise the new RaftReplicaManager code via system tests and unit/integration tests (both to come in later PRs).

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Comment thread core/src/main/scala/kafka/server/DelayedDeleteRecords.scala Outdated
@cmccabe cmccabe added the kraft label Feb 5, 2021
val partitionsMadeLeader = makeLeaders(partitionsAlreadyExisting, leaderPartitionStates,
highWatermarkCheckpoints,-1, mostRecentMetadataOffsets)
val partitionsMadeFollower = makeFollowers(partitionsAlreadyExisting,
createMetadataBrokersFromCurrentCache, followerPartitionStates,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any guarantee that the metadata cache is in a state that is consistent with the deferred changes?

Good question. Metadata changes up through the point of applying the deferred partition metadata changes should be applied to the metadata cache at this point.

One thing we need to think about is the fact that we currently don't defer metadata cache changes at all. The metadata cache will contain partition states that are ahead of ReplicaManager during the time when ReplicaManager is deferring its changes. This means, for example, that the following will reflect deferred partition changes that have been applied to the metadata cache but that have not been applied to ReplicaManager. We may have to write test cases for each of these conditions so we can be clear on what the expected behavior should be.

  • MetadataRequest
  • FindCoordinatorRequest
  • ElectLeadersRequest with topicPartitions = null
  • DelayedCreatePartitions (in topic purgatory)
  • DelayedElectLeader (in elect leader purgatory)
  • Anything that calls ReplicaManager.fetchMessages() and DelayedFetch (in fetch purgatory), though these seem okay since they wait until they can get enough data?
  • TransactionMarkerChannelManager.addTxnMarkersToBrokerQueue
  • DescribeConfigsRequest
  • OffsetCommitRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
  • ProduceRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
  • FetchRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
  • DeleteRecordsRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
  • AddPartitionsToTxnRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
  • TxnOffsetCommitRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
  • OffsetDeleteRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the partition state with respect to deferral to MetadataPartitions. Storing the information in the metadat cache could help here.

stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions prior to the error: " +
s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)")
// Re-throw the exception for it to be caught in BrokerMetadataListener
throw e
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we fail to apply changes, I guess we have to see that as a fatal error? The only possible way of recovering would be to replay the changes.

Yeah, I think so. We may need to put effort into minimizing the blast radius of these failures.

@rondagostino rondagostino marked this pull request as ready for review February 5, 2021 23:42
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Feb 7, 2021

@rondagostino Looks like a file is missing the license:

19:18:36 Execution failed for task ':rat'.
19:18:36 > Found 1 files with unknown licenses.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Feb 7, 2021

All builds are green!

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala Outdated

object MetadataPartition {
def apply(name: String, record: PartitionRecord): MetadataPartition = {
val OffsetNeverDeferred = 0L // must not be a valid offset we could see (i.e. must not be positive)
Copy link
Copy Markdown
Contributor

@cmccabe cmccabe Feb 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add JavaDoc for this?

Also, what about NoDeferredOffset as a name?

One last question... why is this 0 and not -1? 0 is a valid offset in the log, whereas -1 is not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we will remove this and not include the last seen offset in log messages when applying deferred changes.

Collections.emptyList())
Collections.emptyList(),
largestDeferredOffsetEverSeen = deferredAtOffset.getOrElse(OffsetNeverDeferred),
isCurrentlyDeferringChanges = deferredAtOffset.isDefined)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... why do we need this boolean? Can't we just check if largestDeferredOffsetEverSeen is not OffsetNeverDeferred

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We basically use largestDeferredOffsetEverSeen only for logging at this point -- we also check it in a few private def sanityCheckState...() RaftReplicaManager methods. We could completely eliminate largestDeferredOffsetEverSeen if we didn't want to log when the partition was last deferred. It just tracks when the partition was last seen and the change at that offset was deferred rather than directly applied. Once the partition is no longer deferred the value remains whatever it was and the boolean flips to false.

It does seem on the surface that we could change the declaration to deferredSinceOffset and get rid of the boolean -- and deferredSinceOffset would change to -1 once those changes are applied. But there is a problem with this if the partition changes to not being deferred in the metadata cache before we ask RaftReplicaManager to process all of its deferred changes: the value will be -1 in the metadata cache under those circumstances, and we wouldn't have the value to log.

So I think we have a few options.

  1. Do the logging, apply the changes to the matadata cache before replica manager, and keep the Long and Boolean as currently defined
  2. Do the logging, apply the changes to the matadata cache after replica manager, and use just a Long (with the semantics being changed as described above)
  3. Just use a Boolean and don't do the logging.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we will eliminate the information from the messages we log when applying deferred changes, and we won't carry that info around in MetadataPartition. Currently RaftReplicaManager knows if it is deferring changes or not. Maybe later when we get BrokerLifecycleManager and BrokerMetadataListener committed we can think about where a global boolean might live to identify if the broker is fenced or not. It isn't critical to decide right now because we are only going to defer the application of partition metadata at startup in 2.8.

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Feb 8, 2021

Thanks for this PR, @rondagostino !

I can see why you wanted to have RaftReplicaChangeDelegateHelper. The ReplicaManager is not very easy to unit test because it has grown so large. I don't think this delegate thing is quite the right abstraction here-- it's pretty confusing-- but I guess let's revisit this after 2.8 is finished.

I suppose one option is, once ReplicaManager is a pure interface, we can split the kip-500 update logic off into a separate set of functions that takes a ReplicaManager as an input. Then we can easily unit-test the update logic with a MockReplicaManager.

For now I left two small comments... LGTM after those are addressed.

Copy link
Copy Markdown
Contributor Author

@rondagostino rondagostino left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe I pushed a commit getting rid of the information from MetadataPartition and removing the information from the log messages.


object MetadataPartition {
def apply(name: String, record: PartitionRecord): MetadataPartition = {
val OffsetNeverDeferred = 0L // must not be a valid offset we could see (i.e. must not be positive)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we will remove this and not include the last seen offset in log messages when applying deferred changes.

Collections.emptyList())
Collections.emptyList(),
largestDeferredOffsetEverSeen = deferredAtOffset.getOrElse(OffsetNeverDeferred),
isCurrentlyDeferringChanges = deferredAtOffset.isDefined)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we will eliminate the information from the messages we log when applying deferred changes, and we won't carry that info around in MetadataPartition. Currently RaftReplicaManager knows if it is deferring changes or not. Maybe later when we get BrokerLifecycleManager and BrokerMetadataListener committed we can think about where a global boolean might live to identify if the broker is fenced or not. It isn't critical to decide right now because we are only going to defer the application of partition metadata at startup in 2.8.

Comment thread core/src/main/scala/kafka/server/RaftReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Feb 9, 2021

Thanks @rondagostino . I left two small comments... LGTM after those are addressed.

@cmccabe cmccabe merged commit 31a647f into apache:trunk Feb 10, 2021
cmccabe pushed a commit that referenced this pull request Feb 10, 2021
This adds the logic to apply partition metadata when consuming from the Raft-based
metadata log.

RaftReplicaManager extends ReplicaManager for now to minimize changes to existing
code for the 2.8 release. We will likely adjust this hierarchy at a later time (e.g. introducing
a trait and adding a helper to refactor common code). For now, we expose the necessary
fields and methods in ReplicaManager by changing their scope from private to protected,
and we refactor out a couple of pieces of logic that are shared between the two
implementation (stopping replicas and adding log dir fetchers).

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
ijuma added a commit to ijuma/kafka that referenced this pull request Feb 14, 2021
…e-allocations-lz4

* apache-github/trunk: (118 commits)
  KAFKA-12327: Remove MethodHandle usage in CompressionType (apache#10123)
KAFKA-12297: Make MockProducer return RecordMetadata with values as
per contract
  MINOR: Update zstd and use classes with no finalizers (apache#10120)
KAFKA-12326: Corrected regresion in MirrorMaker 2 executable
introduced with KAFKA-10021 (apache#10122)
KAFKA-12321 the comparison function for uuid type should be 'equals'
rather than '==' (apache#10098)
  MINOR: Add FetchSnapshot API doc in KafkaRaftClient (apache#10097)
  MINOR: KIP-631 KafkaConfig fixes and improvements (apache#10114)
  KAFKA-12272: Fix commit-interval metrics (apache#10102)
  MINOR: Improve confusing admin client shutdown logging (apache#10107)
  MINOR: Add BrokerMetadataListener (apache#10111)
  MINOR: Support Raft-based metadata quorums in system tests (apache#10093)
MINOR: add the MetaLogListener, LocalLogManager, and Controller
interface. (apache#10106)
  MINOR: Introduce the KIP-500 Broker lifecycle manager (apache#10095)
MINOR: Remove always-passing validation in
TestRecordTest#testProducerRecord (apache#9930)
KAFKA-5235: GetOffsetShell: Support for multiple topics and consumer
configuration override (KIP-635) (apache#9430)
MINOR: Prevent creating partition.metadata until ID can be written
(apache#10041)
  MINOR: Add RaftReplicaManager (apache#10069)
MINOR: Add ClientQuotaMetadataManager for processing QuotaRecord
(apache#10101)
  MINOR: Rename DecommissionBrokers to UnregisterBrokers (apache#10084)
MINOR: KafkaBroker.brokerState should be volatile instead of
AtomicReference (apache#10080)
  ...

clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants