Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,7 @@ private void resetToEmptyState() {
private int lastCommittedEpoch = -1;

/**
* The timestamp in milliseconds of the last batch we have committed, or -1 if we have not commmitted any offset.
* The timestamp in milliseconds of the last batch we have committed, or -1 if we have not committed any offset.
*/
private long lastCommittedTimestamp = -1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ default void write(int version, ApiMessage message) {
void write(ApiMessageAndVersion record);

/**
* Close the image writer, dicarding all progress. Calling this function more than once has
* Close the image writer, discarding all progress. Calling this function more than once has
* no effect.
*/
default void close() {
Expand Down
2 changes: 1 addition & 1 deletion raft/src/main/java/org/apache/kafka/raft/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public int hashCode() {
* @param epoch epoch of the leader that created this batch
* @param appendTimestamp timestamp in milliseconds of when the batch was appended
* @param sizeInBytes number of bytes used by this batch
* @param lastOffset offset of the last record of this batch
* @param records the list of records in this batch
*/
public static <T> Batch<T> control(
long baseOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class CandidateState implements EpochState {
private final Logger log;

/**
* The life time of a candidate state is the following:
* The lifetime of a candidate state is the following:
*
* 1. Once started, it would keep record of the received votes.
* 2. If majority votes granted, it can then end its life and will be replaced by a leader state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface ExpirationService {
* the provided time limit expires.
*
* @param timeoutMs the duration in milliseconds before the future is completed exceptionally
* @param <T> arbitrary future type (the service must set no expectation on the this type)
* @param <T> arbitrary future type (the service must set no expectation on this type)
* @return the completable future
*/
<T> CompletableFuture<T> failAfter(long timeoutMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public void register(Listener<T> listener) {
@Override
public void unregister(Listener<T> listener) {
pendingRegistrations.add(Registration.unregister(listener));
// No need to wakeup the polling thread. It is a removal so the updates can be
// No need to wake up the polling thread. It is a removal so the updates can be
// delayed until the polling thread wakes up for other reasons.
}

Expand Down
6 changes: 3 additions & 3 deletions raft/src/main/java/org/apache/kafka/raft/RaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ default void beginShutdown() {}
/**
* Unregisters a listener.
*
* To distinguish from events that happend before the call to {@code unregister} and a future
* To distinguish from events that happened before the call to {@code unregister} and a future
* call to {@code register}, different {@code Listener} instances must be used.
*
* If the {@code Listener} provided was never registered then the unregistration is ignored.
Expand All @@ -115,7 +115,7 @@ default void beginShutdown() {}
void unregister(Listener<T> listener);

/**
* Returns the current high water mark, or OptionalLong.empty if it is not known.
* Returns the current high watermark, or OptionalLong.empty if it is not known.
*/
OptionalLong highWatermark();

Expand Down Expand Up @@ -226,7 +226,7 @@ default void beginShutdown() {}
* @param snapshotId The ID of the new snapshot, which includes the (exclusive) last committed offset
* and the last committed epoch.
* @param lastContainedLogTime The append time of the highest record contained in this snapshot
* @return a writable snapshot if it doesn't already exists
* @return a writable snapshot if it doesn't already exist
* @throws IllegalArgumentException if the committed offset is greater than the high-watermark
* or less than the log start offset.
*/
Expand Down
4 changes: 2 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
* the quorum leader.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
* @return a writable snapshot if it doesn't already exists
* @return a writable snapshot if it doesn't already exist
*/
Optional<RawSnapshotWriter> storeSnapshot(OffsetAndEpoch snapshotId);

Expand Down Expand Up @@ -287,7 +287,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
/**
* Returns the latest snapshot id if one exists.
*
* @return an Optional snapshot id of the latest snashot if one exists, otherwise returns an
* @return an Optional snapshot id of the latest snapshot if one exists, otherwise returns an
* empty Optional
*/
Optional<OffsetAndEpoch> latestSnapshotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface RawSnapshotReader {
long sizeInBytes();

/**
* Creates a slize of unaligned records from the position up to a size.
* Creates a slice of unaligned records from the position up to a size.
*
* @param position the starting position of the slice in the snapshot
* @param size the maximum size of the slice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) {
Path dir = snapshotDir(logDir);

try {
// Create the snapshot directory if it doesn't exists
// Create the snapshot directory if it doesn't exist
Files.createDirectories(dir);
String prefix = String.format("%s-", filenameFromSnapshotId(snapshotId));
return Files.createTempFile(dir, prefix, PARTIAL_SUFFIX);
Expand Down