diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 47631998fbbec..30fb695d92689 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -105,7 +105,7 @@ public void start(Map props) { } @Override - public void commit() throws InterruptedException { + public void commit() { // nop } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java index 68e6441f18fc7..e1ecb1e1dbad0 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java @@ -39,9 +39,9 @@ public class OffsetSync { new Field(TOPIC_KEY, Type.STRING), new Field(PARTITION_KEY, Type.INT32)); - private TopicPartition topicPartition; - private long upstreamOffset; - private long downstreamOffset; + private final TopicPartition topicPartition; + private final long upstreamOffset; + private final long downstreamOffset; public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { this.topicPartition = topicPartition; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index 600dda46f3166..9152cd5aa0b99 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -30,9 +30,9 @@ /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ class OffsetSyncStore implements AutoCloseable { - private KafkaConsumer consumer; - private Map offsetSyncs = new HashMap<>(); - private TopicPartition offsetSyncTopicPartition; + private final KafkaConsumer consumer; + private final Map offsetSyncs = new HashMap<>(); + private final TopicPartition offsetSyncTopicPartition; OffsetSyncStore(MirrorConnectorConfig config) { consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java index 20f2ca7e2c5cc..0644d6a6c6c05 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; class Scheduler implements AutoCloseable { - private static Logger log = LoggerFactory.getLogger(Scheduler.class); + private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class); private final String name; private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); @@ -62,11 +62,11 @@ void execute(Task task, String description) { try { executor.submit(() -> executeThread(task, description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - log.warn("{} was interrupted running task: {}", name, description); + LOG.warn("{} was interrupted running task: {}", name, description); } catch (TimeoutException e) { - log.error("{} timed out running task: {}", name, description); + LOG.error("{} timed out running task: {}", name, description); } catch (Throwable e) { - log.error("{} caught exception in task: {}", name, description, e); + LOG.error("{} caught exception in task: {}", name, description, e); } } @@ -76,10 +76,10 @@ public void close() { try { boolean terminated = executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); if (!terminated) { - log.error("{} timed out during shutdown of internal scheduler.", name); + LOG.error("{} timed out during shutdown of internal scheduler.", name); } } catch (InterruptedException e) { - log.warn("{} was interrupted during shutdown of internal scheduler.", name); + LOG.warn("{} was interrupted during shutdown of internal scheduler.", name); } } @@ -92,21 +92,21 @@ private void run(Task task, String description) { long start = System.currentTimeMillis(); task.run(); long elapsed = System.currentTimeMillis() - start; - log.info("{} took {} ms", description, elapsed); + LOG.info("{} took {} ms", description, elapsed); if (elapsed > timeout.toMillis()) { - log.warn("{} took too long ({} ms) running task: {}", name, elapsed, description); + LOG.warn("{} took too long ({} ms) running task: {}", name, elapsed, description); } } catch (InterruptedException e) { - log.warn("{} was interrupted running task: {}", name, description); + LOG.warn("{} was interrupted running task: {}", name, description); } catch (Throwable e) { - log.error("{} caught exception in scheduled task: {}", name, description, e); + LOG.error("{} caught exception in scheduled task: {}", name, description, e); } } private void executeThread(Task task, String description) { Thread.currentThread().setName(name + "-" + description); if (closed) { - log.info("{} skipping task due to shutdown: {}", name, description); + LOG.info("{} skipping task due to shutdown: {}", name, description); return; } run(task, description); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 8f692ca911612..f325e15695b6d 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -729,7 +729,7 @@ private void createTopics() { /* * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly */ - protected void warmUpConsumer(Map consumerProps) throws InterruptedException { + protected void warmUpConsumer(Map consumerProps) { Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1"); dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); dummyConsumer.commitSync();