From f863f6fe319a97d84b5924962e6d58ad90c621c5 Mon Sep 17 00:00:00 2001 From: David Mollitor Date: Sun, 12 Jan 2020 11:29:18 -0500 Subject: [PATCH 1/2] KAFKA-9405: Use Map API computeIfAbsent Where Applicable --- .../kafka/clients/admin/KafkaAdminClient.java | 14 ++++++-------- .../runtime/distributed/ConnectProtocol.java | 14 ++------------ 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index ec896ca6d6422..0111e356b49f8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2242,11 +2242,10 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection> partitionsByBroker = new HashMap<>(); - for (TopicPartitionReplica replica: replicas) { - if (!partitionsByBroker.containsKey(replica.brokerId())) - partitionsByBroker.put(replica.brokerId(), new HashSet<>()); - partitionsByBroker.get(replica.brokerId()).add(new TopicPartition(replica.topic(), replica.partition())); - } + for (TopicPartitionReplica replica : replicas) { + partitionsByBroker.computeIfAbsent(replica.brokerId(), key -> new HashSet<>()) + .add(new TopicPartition(replica.topic(), replica.partition())); + } final long now = time.milliseconds(); for (Map.Entry> entry: partitionsByBroker.entrySet()) { @@ -2414,9 +2413,8 @@ void handleResponse(AbstractResponse abstractResponse) { } else { Node node = cluster.leaderFor(entry.getKey()); if (node != null) { - if (!leaders.containsKey(node)) - leaders.put(node, new HashMap<>()); - leaders.get(node).put(entry.getKey(), entry.getValue().beforeOffset()); + leaders.computeIfAbsent(node, key -> new HashMap<>()).put(entry.getKey(), + entry.getValue().beforeOffset()); } else { future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception()); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java index 15fc6059a627b..c167e80991fe5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java @@ -387,21 +387,11 @@ protected Map> asMap() { // Using LinkedHashMap preserves the ordering, which is helpful for tests and debugging Map> taskMap = new LinkedHashMap<>(); for (String connectorId : new HashSet<>(connectorIds)) { - Collection connectorTasks = taskMap.get(connectorId); - if (connectorTasks == null) { - connectorTasks = new ArrayList<>(); - taskMap.put(connectorId, connectorTasks); - } - connectorTasks.add(CONNECTOR_TASK); + taskMap.computeIfAbsent(connectorId, key -> new ArrayList<>()).add(CONNECTOR_TASK); } for (ConnectorTaskId taskId : taskIds) { String connectorId = taskId.connector(); - Collection connectorTasks = taskMap.get(connectorId); - if (connectorTasks == null) { - connectorTasks = new ArrayList<>(); - taskMap.put(connectorId, connectorTasks); - } - connectorTasks.add(taskId.task()); + taskMap.computeIfAbsent(connectorId, key -> new ArrayList<>()).add(taskId.task()); } return taskMap; } From 180a7404fb75058a004a02e8f5dc5721044daf69 Mon Sep 17 00:00:00 2001 From: David Mollitor Date: Sun, 12 Jan 2020 22:40:53 -0500 Subject: [PATCH 2/2] Fix checkstyle issues --- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 0111e356b49f8..f62e3b714813b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2242,10 +2242,10 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection> partitionsByBroker = new HashMap<>(); - for (TopicPartitionReplica replica : replicas) { + for (TopicPartitionReplica replica : replicas) { partitionsByBroker.computeIfAbsent(replica.brokerId(), key -> new HashSet<>()) .add(new TopicPartition(replica.topic(), replica.partition())); - } + } final long now = time.milliseconds(); for (Map.Entry> entry: partitionsByBroker.entrySet()) { @@ -2413,7 +2413,7 @@ void handleResponse(AbstractResponse abstractResponse) { } else { Node node = cluster.leaderFor(entry.getKey()); if (node != null) { - leaders.computeIfAbsent(node, key -> new HashMap<>()).put(entry.getKey(), + leaders.computeIfAbsent(node, key -> new HashMap<>()).put(entry.getKey(), entry.getValue().beforeOffset()); } else { future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());