Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static OffsetSpec earliest() {
}

/**
* Used to retrieve the the earliest offset whose timestamp is greater than
* Used to retrieve the earliest offset whose timestamp is greater than
* or equal to the given timestamp in the corresponding partition
* @param timestamp in milliseconds
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public double measure(MetricConfig config, long now) {
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
* @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
* running the the partitioner's onNewBatch method before trying to append again
* running the partitioner's onNewBatch method before trying to append again
* @param nowMs The current time, in milliseconds
*/
public RecordAppendResult append(TopicPartition tp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class TopicConfig {
public static final String MAX_MESSAGE_BYTES_DOC =
"The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that the they can fetch record batches this large. " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void testMetadataWithExistingAssignment() {
Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0),
Collections.emptyList(), Collections.emptyList(), 0);
ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
// Using onJoinComplete to register the protocol selection decided by the the broker
// Using onJoinComplete to register the protocol selection decided by the broker
// coordinator as well as an existing previous assignment that the call to metadata will
// include with v1 but not with v0
coordinator.onJoinComplete(generationId, memberId, compatibility.protocol(), buf);
Expand Down Expand Up @@ -247,7 +247,7 @@ public void testMetadataWithExistingAssignmentButOlderProtocolSelection() {
Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0),
Collections.emptyList(), Collections.emptyList(), 0);
ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
// Using onJoinComplete to register the protocol selection decided by the the broker
// Using onJoinComplete to register the protocol selection decided by the broker
// coordinator as well as an existing previous assignment that the call to metadata will
// include with v1 but not with v0
coordinator.onJoinComplete(generationId, memberId, EAGER.protocol(), buf);
Expand Down Expand Up @@ -511,7 +511,7 @@ public void testTaskAssignmentWhenWorkerBounces() {

result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);

// A rebalance after the delay expires re-assigns the lost tasks the the returning member
// A rebalance after the delay expires re-assigns the lost tasks to the returning member
leaderAssignment = deserializeAssignment(result, leaderId);
assertAssignment(leaderId, offset,
Collections.emptyList(), 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
store.stateStore.name(), store.offset, store.changelogPartition);
} else {
// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file
// and hence we are uncertain the the current local state only contains committed data;
// and hence we are uncertain that the current local state only contains committed data;
// in that case we need to treat it as a task-corrupted exception
if (eosEnabled && !storeDirIsEmpty) {
log.warn("State store {} did not find checkpoint offsets while stores are not empty, " +
Expand Down