Skip to content

KAFKA-7395; Add fencing to replication protocol (KIP-320)#5661

Merged
hachikuji merged 10 commits intoapache:trunkfrom
hachikuji:KAFKA-7395
Oct 5, 2018
Merged

KAFKA-7395; Add fencing to replication protocol (KIP-320)#5661
hachikuji merged 10 commits intoapache:trunkfrom
hachikuji:KAFKA-7395

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji commented Sep 18, 2018

This patch contains the broker-side support for the fencing improvements from KIP-320. This includes the leader epoch validation in the ListOffsets, OffsetsForLeaderEpoch, and Fetch APIs as well as the changes needed in the fetcher threads to maintain and use the current leader epoch. The client changes from KIP-320 will be left for a follow-up.

One notable change worth mentioning is that we now require the read lock in Partition in order to read from the log or to query offsets. This is necessary to ensure the safety of the leader epoch validation. Additionally, we forward all leader epoch changes to the replica fetcher thread and go through the truncation phase. This is needed to ensure the fetcher always has the latest epoch and to guarantee that we cannot miss needed truncation if we missed an epoch change.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the patch. Looks good overall. A few comments below.

Also, is it true that this PR hasn't added the logic for the consumer to use OffsetForLeaderEpoch yet?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue but readOnlyCommitted is a bit confusing given the transactional commit in isolation level. Perhaps naming it readUpToHighWatermark?

Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
Comment thread core/src/main/scala/kafka/server/AbstractFetcherManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/AbstractFetcherManager.scala Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment doesn't read right with two "either"?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the leader epoch will be 1, which doesn't match the leader epoch of 0 set in line 180. Do we need to extend MockFetcherThread to support currentLeaderEpoch?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good catch. I meant to do this, but forgot about it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also assert that the offset is back to 3 now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaderEpoch of 0 doesn't seem to be consistent with the leader epoch in the log.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error msg should say leaders not changed yet?

@hachikuji
Copy link
Copy Markdown
Contributor Author

retest this please

@hachikuji
Copy link
Copy Markdown
Contributor Author

@junrao This is ready for another look. To answer your question, it was indeed my intention to do the client-side implementation in a separate PR. For now, I have removed the error handling in Fetcher since we are not sending the current leader epoch anyway.

@hachikuji
Copy link
Copy Markdown
Contributor Author

@lindong28 I think this PR will be wrapped up soon. Do you think we can still get it into 2.1?

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the updated patch. A few more comments below.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be accurate. We only go to the prior epoch if the requested epoch is not present. Otherwise, the requested epoch and its last offset will be returned.

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default for minOneMessage is changed to true. This means that caller convertToOffsetMetadata() will fetch one message, but it doesn't need to.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should leaderEpoch be 0 to match what's in line 163?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure that I understand the "unless" part.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we verify that tp doesn't exist in firstLeaderFetcher? Also, should we call EasyMock.verify(fetcher) at the end?

metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.brokerEndPoint(config.interBrokerListenerName),
partition.getReplica().get.highWatermark.messageOffset)).toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue of not calling replicaFetcherManager.removeFetcherForPartitions() in line 1278 is that in the case of controlled shutdown, we avoid adding the partitions to the fetcher. This means that existing partitions won't be removed from the fetcher. This may cause replicas removed from ISR during controlled shutdown to be added back to ISR again.

Also, if we do this, the state-change logging after replicaFetcherManager.removeFetcherForPartitions() probably needs to be moved too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's revert this change. I think it was not strictly needed and I was not too happy about the additional bookkeeping in AbstractFetcherManager.

@lindong28
Copy link
Copy Markdown
Member

@hachikuji Would user be able to benefit from this PR if we do not implement the client-side part of KIP-320? Strictly speaking we are not supposed to commit large PR after feature freeze date of Oct 1st. I am trying to understand whether the benefit of this PR is worth breaking this plan.

@hachikuji
Copy link
Copy Markdown
Contributor Author

@lindong28 The main benefit is the improved fencing on the brokers. Without it, we will still have the possibility of data loss when brokers turn zombie. These edge cases are rare in practice, so probably not too much damage if it slips, though it would be kind of a pity since we already bumped the protocols.

@lindong28
Copy link
Copy Markdown
Member

@hachikuji I see. Sure, if you and @junrao are confident in this PR, please feel free to commit it into 2.1 branch as well :)

@hachikuji
Copy link
Copy Markdown
Contributor Author

@lindong28 Sounds good. I feel pretty good about it, but let's see what Jun thinks tomorrow after he's had a chance to see the latest updates.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the latest patch. LGTM. Just a few minor comments below.

Since we already bumped up the request version and this patch seems less risky, I am fine with merging this to 2.1.

expectDeletedFiles)
}

def readLog(log: Log, startOffset: Long, maxLength: Int,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?

}

fetcherThread.addPartitions(initialOffsetAndEpochs)
info(s"Added fetcher to broker ${brokerAndFetcherId.broker} for partitions $initialOffsetAndEpochs")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original version logs one partition per line. Perhaps that's easier to parse when debugging?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I've found the original message to be too big to be useful in practice. This was an attempt to break it down so that at least there would be a separate message per broker. It was also a tad annoying that we had to build a whole new collection just to print a log message.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we can keep it this way then.

partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch,
isr, 1, replicas, true), 0))

val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I am not sure how this test works now. setupPartitionWithMocks() creates a Log that's different from the one created in line 105. So not sure how they have the same log end offset. Also, do we need both Log?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah, this is strange. I think it works because even though they are separate Log instances, they use the same directory and log files. Let me try to fix.

@hachikuji
Copy link
Copy Markdown
Contributor Author

@junrao Thanks for reviewing. I will plan to merge to trunk and 2.1 once the build completes.

@hachikuji
Copy link
Copy Markdown
Contributor Author

The SuppressionIntegrationTest failure is known to be flaky. I will go ahead and merge to trunk and 2.1.

@hachikuji hachikuji merged commit ed3bd79 into apache:trunk Oct 5, 2018
hachikuji added a commit that referenced this pull request Oct 5, 2018
This patch contains the broker-side support for the fencing improvements from KIP-320. This includes the leader epoch validation in the ListOffsets, OffsetsForLeaderEpoch, and Fetch APIs as well as the changes needed in the fetcher threads to maintain and use the current leader epoch. The client changes from KIP-320 will be left for a follow-up.

One notable change worth mentioning is that we now require the read lock in `Partition` in order to read from the log or to query offsets. This is necessary to ensure the safety of the leader epoch validation. Additionally, we forward all leader epoch changes to the replica fetcher thread and go through the truncation phase. This is needed to ensure the fetcher always has the latest epoch and to guarantee that we cannot miss needed truncation if we missed an epoch change.

Reviewers: Jun Rao <junrao@gmail.com>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
This patch contains the broker-side support for the fencing improvements from KIP-320. This includes the leader epoch validation in the ListOffsets, OffsetsForLeaderEpoch, and Fetch APIs as well as the changes needed in the fetcher threads to maintain and use the current leader epoch. The client changes from KIP-320 will be left for a follow-up.

One notable change worth mentioning is that we now require the read lock in `Partition` in order to read from the log or to query offsets. This is necessary to ensure the safety of the leader epoch validation. Additionally, we forward all leader epoch changes to the replica fetcher thread and go through the truncation phase. This is needed to ensure the fetcher always has the latest epoch and to guarantee that we cannot miss needed truncation if we missed an epoch change.

Reviewers: Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants