Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void start(Map<String, String> props) {
}

@Override
public void commit() throws InterruptedException {
public void commit() {
// nop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public class OffsetSync {
new Field(TOPIC_KEY, Type.STRING),
new Field(PARTITION_KEY, Type.INT32));

private TopicPartition topicPartition;
private long upstreamOffset;
private long downstreamOffset;
private final TopicPartition topicPartition;
private final long upstreamOffset;
private final long downstreamOffset;

public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
this.topicPartition = topicPartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@

/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
class OffsetSyncStore implements AutoCloseable {
private KafkaConsumer<byte[], byte[]> consumer;
private Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
private TopicPartition offsetSyncTopicPartition;
private final KafkaConsumer<byte[], byte[]> consumer;
private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
private final TopicPartition offsetSyncTopicPartition;

OffsetSyncStore(MirrorConnectorConfig config) {
consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.slf4j.LoggerFactory;

class Scheduler implements AutoCloseable {
private static Logger log = LoggerFactory.getLogger(Scheduler.class);
private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);

private final String name;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
Expand Down Expand Up @@ -62,11 +62,11 @@ void execute(Task task, String description) {
try {
executor.submit(() -> executeThread(task, description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("{} was interrupted running task: {}", name, description);
LOG.warn("{} was interrupted running task: {}", name, description);
} catch (TimeoutException e) {
log.error("{} timed out running task: {}", name, description);
LOG.error("{} timed out running task: {}", name, description);
} catch (Throwable e) {
log.error("{} caught exception in task: {}", name, description, e);
LOG.error("{} caught exception in task: {}", name, description, e);
}
}

Expand All @@ -76,10 +76,10 @@ public void close() {
try {
boolean terminated = executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!terminated) {
log.error("{} timed out during shutdown of internal scheduler.", name);
LOG.error("{} timed out during shutdown of internal scheduler.", name);
}
} catch (InterruptedException e) {
log.warn("{} was interrupted during shutdown of internal scheduler.", name);
LOG.warn("{} was interrupted during shutdown of internal scheduler.", name);
}
}

Expand All @@ -92,21 +92,21 @@ private void run(Task task, String description) {
long start = System.currentTimeMillis();
task.run();
long elapsed = System.currentTimeMillis() - start;
log.info("{} took {} ms", description, elapsed);
LOG.info("{} took {} ms", description, elapsed);
if (elapsed > timeout.toMillis()) {
log.warn("{} took too long ({} ms) running task: {}", name, elapsed, description);
LOG.warn("{} took too long ({} ms) running task: {}", name, elapsed, description);
}
} catch (InterruptedException e) {
log.warn("{} was interrupted running task: {}", name, description);
LOG.warn("{} was interrupted running task: {}", name, description);
} catch (Throwable e) {
log.error("{} caught exception in scheduled task: {}", name, description, e);
LOG.error("{} caught exception in scheduled task: {}", name, description, e);
}
}

private void executeThread(Task task, String description) {
Thread.currentThread().setName(name + "-" + description);
if (closed) {
log.info("{} skipping task due to shutdown: {}", name, description);
LOG.info("{} skipping task due to shutdown: {}", name, description);
return;
}
run(task, description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ private void createTopics() {
/*
* Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
*/
protected void warmUpConsumer(Map<String, Object> consumerProps) throws InterruptedException {
protected void warmUpConsumer(Map<String, Object> consumerProps) {
Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
dummyConsumer.commitSync();
Expand Down