diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java index 262d264bac51c..2517985a00cde 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java @@ -52,7 +52,7 @@ public static OffsetSpec earliest() { } /** - * Used to retrieve the the earliest offset whose timestamp is greater than + * Used to retrieve the earliest offset whose timestamp is greater than * or equal to the given timestamp in the corresponding partition * @param timestamp in milliseconds */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ee23bfb965420..68add1712e3cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -174,7 +174,7 @@ public double measure(MetricConfig config, long now) { * @param callback The user-supplied callback to execute when the request is complete * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and - * running the the partitioner's onNewBatch method before trying to append again + * running the partitioner's onNewBatch method before trying to append again * @param nowMs The current time, in milliseconds */ public RecordAppendResult append(TopicPartition tp, diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 1279a4d2e95ab..7121979e1eb1c 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -79,7 +79,7 @@ public class TopicConfig { public static final String MAX_MESSAGE_BYTES_DOC = "The largest record batch size allowed by Kafka (after compression if compression is enabled). " + "If this is increased and there are consumers older than 0.10.2, the consumers' fetch " + - "size must also be increased so that the they can fetch record batches this large. " + + "size must also be increased so that they can fetch record batches this large. " + "In the latest message format version, records are always grouped into batches for efficiency. " + "In previous message format versions, uncompressed records are not grouped into batches and this " + "limit only applies to a single record in that case."; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java index 04c764cb89f1e..5c44fe6190e14 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java @@ -216,7 +216,7 @@ public void testMetadataWithExistingAssignment() { Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0), Collections.emptyList(), Collections.emptyList(), 0); ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment); - // Using onJoinComplete to register the protocol selection decided by the the broker + // Using onJoinComplete to register the protocol selection decided by the broker // coordinator as well as an existing previous assignment that the call to metadata will // include with v1 but not with v0 coordinator.onJoinComplete(generationId, memberId, compatibility.protocol(), buf); @@ -247,7 +247,7 @@ public void testMetadataWithExistingAssignmentButOlderProtocolSelection() { Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0), Collections.emptyList(), Collections.emptyList(), 0); ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment); - // Using onJoinComplete to register the protocol selection decided by the the broker + // Using onJoinComplete to register the protocol selection decided by the broker // coordinator as well as an existing previous assignment that the call to metadata will // include with v1 but not with v0 coordinator.onJoinComplete(generationId, memberId, EAGER.protocol(), buf); @@ -511,7 +511,7 @@ public void testTaskAssignmentWhenWorkerBounces() { result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers); - // A rebalance after the delay expires re-assigns the lost tasks the the returning member + // A rebalance after the delay expires re-assigns the lost tasks to the returning member leaderAssignment = deserializeAssignment(result, leaderId); assertAssignment(leaderId, offset, Collections.emptyList(), 0, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index f7c1936f48b24..dbd2b7270b150 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -219,7 +219,7 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { store.stateStore.name(), store.offset, store.changelogPartition); } else { // with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file - // and hence we are uncertain the the current local state only contains committed data; + // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception if (eosEnabled && !storeDirIsEmpty) { log.warn("State store {} did not find checkpoint offsets while stores are not empty, " +