From 8b3f6aee7256e93ad9b465df840f97e03687f589 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 10 Jul 2019 14:53:14 -0700 Subject: [PATCH 1/5] doc updates and changes to use the CollectionUtils.mapValues utility method --- .../org/apache/druid/utils/CollectionUtils.java | 14 ++++++++++++++ ...ncrementalPublishingKafkaIndexTaskRunner.java | 12 +++++------- .../druid/indexing/common/task/IndexTask.java | 4 ++-- .../druid/indexing/overlord/TaskQueue.java | 15 +++------------ .../SeekableStreamIndexTaskRunner.java | 7 +++++-- .../metadata/SQLMetadataSegmentManager.java | 12 +++++++----- .../appenderator/BaseAppenderatorDriver.java | 10 ++-------- .../org/apache/druid/server/SegmentManager.java | 9 +++------ .../druid/server/coordinator/DruidCluster.java | 16 +++++++--------- .../coordinator/cost/ClusterCostCache.java | 7 ++----- .../server/coordinator/cost/ServerCostCache.java | 7 ++----- 11 files changed, 52 insertions(+), 61 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index af3cf077f430..364661bf5ef6 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -23,6 +23,7 @@ import java.util.AbstractCollection; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Spliterator; @@ -75,6 +76,8 @@ public int size() /** * Returns a transformed map from the given input map where the value is modified based on the given valueMapper * function. + * Unlike {@link Maps#transformValues}, this method applies the mapping function eagerly to all key-value pairs + * in the source map and returns a new {@link HashMap}, while {@link Maps#transformValues} returns a lazy map view. */ public static Map mapValues(Map map, Function valueMapper) { @@ -83,6 +86,17 @@ public static Map mapValues(Map map, Function val return result; } + /** + * Returns a transformed map from the given input map where the key is modified based on the given keyMapper + * function. + */ + public static Map mapKeys(Map map, Function keyMapper) + { + final Map result = Maps.newHashMapWithExpectedSize(map.size()); + map.forEach((k, v) -> result.put(keyMapper.apply(k), v)); + return result; + } + private CollectionUtils() { } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index bf0580cd7ad0..46570aada5ca 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -39,6 +39,7 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.utils.CircularBuffer; +import org.apache.druid.utils.CollectionUtils; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.TopicPartition; @@ -54,7 +55,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Kafka indexing task runner supporting incremental segments publishing @@ -163,12 +163,10 @@ private void possiblyResetOffsetsOrWait( } if (doReset) { - sendResetRequestAndWait(resetPartitions.entrySet() - .stream() - .collect(Collectors.toMap(x -> StreamPartition.of( - x.getKey().topic(), - x.getKey().partition() - ), Map.Entry::getValue)), taskToolbox); + sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, k -> StreamPartition.of( + k.topic(), + k.partition() + )), taskToolbox); } else { log.warn("Retrying in %dms", task.getPollRetryMs()); pollRetryLock.lockInterruptibly(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index e766effb4125..f975db6d6248 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -91,6 +91,7 @@ import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.utils.CircularBuffer; +import org.apache.druid.utils.CollectionUtils; import org.codehaus.plexus.util.FileUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -444,8 +445,7 @@ public TaskStatus run(final TaskToolbox toolbox) toolbox.getTaskActionClient(), intervals ); - versions = locks.entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getVersion())); + versions = CollectionUtils.mapValues(locks, v -> v.getVersion()); dataSchema = ingestionSchema.getDataSchema().withGranularitySpec( ingestionSchema.getDataSchema() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index b820b23012dd..0fcc24acdbcf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -45,6 +45,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.EntryExistsException; +import org.apache.druid.utils.CollectionUtils; import java.util.ArrayList; import java.util.Collection; @@ -621,12 +622,7 @@ private Map getDeltaValues(Map total, Map getSuccessfulTaskCount() { - Map total = totalSuccessfulTaskCount.entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> e.getValue().get() - )); + Map total = CollectionUtils.mapValues(totalSuccessfulTaskCount, v -> v.get()); Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); prevTotalSuccessfulTaskCount = total; return delta; @@ -634,12 +630,7 @@ public Map getSuccessfulTaskCount() public Map getFailedTaskCount() { - Map total = totalFailedTaskCount.entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> e.getValue().get() - )); + Map total = CollectionUtils.mapValues(totalFailedTaskCount, v -> v.get()); Map delta = getDeltaValues(total, prevTotalFailedTaskCount); prevTotalFailedTaskCount = total; return delta; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index f7de882fda15..a40eef0ab0fc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -80,6 +80,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CircularBuffer; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -1274,8 +1275,10 @@ protected void sendResetRequestAndWait( ) throws IOException { - Map partitionOffsetMap = outOfRangePartitions - .entrySet().stream().collect(Collectors.toMap(x -> x.getKey().getPartitionId(), Map.Entry::getValue)); + Map partitionOffsetMap = CollectionUtils.mapKeys( + outOfRangePartitions, + k -> k.getPartitionId() + ); boolean result = taskToolbox .getTaskActionClient() diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index bb8514291df1..5a8e3487ae4b 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -105,11 +105,13 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private final Supplier dbTables; private final SQLMetadataConnector connector; - // Volatile since this reference is reassigned in "poll" and then read from in other threads. - // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty dataSources map and - // empty overshadowedSegments set). - // Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between - // null and nonnull multiple times as stop() and start() are called. + /** + * Marked "volatile" since this reference is reassigned in "poll" and then read from in other threads. + * Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty dataSources map and + * empty overshadowedSegments set). + * Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between + * null and nonnull multiple times as {@link #stop()} and {@link #start()} are called. + */ @Nullable private volatile DataSourcesSnapshot dataSourcesSnapshot = null; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index bfabeed7d772..85e3ed93e3c4 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -43,6 +43,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -689,14 +690,7 @@ WrappedCommitter wrapCommitter(final Committer committer) ) ) ), - snapshot.entrySet() - .stream() - .collect( - Collectors.toMap( - Entry::getKey, - e -> e.getValue().lastSegmentId - ) - ), + CollectionUtils.mapValues(snapshot, v -> v.lastSegmentId), committer.getMetadata() ); diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index ecbb0d496861..3a73a2b8246a 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -32,12 +32,11 @@ import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; /** * This class is responsible for managing data sources and their states like timeline, total segment size, and number of @@ -115,8 +114,7 @@ Map getDataSources() */ public Map getDataSourceSizes() { - return dataSources.entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getTotalSegmentSize())); + return CollectionUtils.mapValues(dataSources, v -> v.getTotalSegmentSize()); } /** @@ -127,8 +125,7 @@ public Map getDataSourceSizes() */ public Map getDataSourceCounts() { - return dataSources.entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getNumSegments())); + return CollectionUtils.mapValues(dataSources, v -> v.getNumSegments()); } public boolean isSegmentCached(final DataSegment segment) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java index 586f92b2dd79..ab804a1c2bfb 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; import java.util.ArrayList; @@ -59,15 +60,12 @@ public DruidCluster( ) { this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes); - this.historicals = historicals - .entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> StreamSupport - .stream(e.getValue().spliterator(), false) - .collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) - )); + this.historicals = CollectionUtils.mapValues( + historicals, + v -> StreamSupport + .stream(v.spliterator(), false) + .collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) + ); } public void add(ServerHolder serverHolder) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java index 63bf3c831933..4354d8d2cd72 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java @@ -21,11 +21,11 @@ import com.google.common.base.Preconditions; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; public class ClusterCostCache { @@ -82,10 +82,7 @@ public void removeServer(String serverName) public ClusterCostCache build() { return new ClusterCostCache( - serversCostCache - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())) + CollectionUtils.mapValues(serversCostCache, v -> v.build()) ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java index 1310b472e7ed..166e138cd2ca 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java @@ -21,10 +21,10 @@ import com.google.common.base.Preconditions; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; public class ServerCostCache { @@ -89,10 +89,7 @@ public ServerCostCache build() { return new ServerCostCache( allSegmentsCostCache.build(), - segmentsPerDataSource - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())) + CollectionUtils.mapValues(segmentsPerDataSource, v -> v.build()) ); } } From dcdf128c34af2b7dfa9e85952bcaa308238d5a36 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 11 Jul 2019 16:40:12 -0700 Subject: [PATCH 2/5] Add Structural Search patterns to intelliJ --- .idea/inspectionProfiles/Druid.xml | 11 +++++++++++ .../indexing/kafka/supervisor/KafkaSupervisor.java | 1 + .../aggregation/post/ExpressionPostAggregator.java | 9 ++------- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml index 4de1e05a5b69..0ef0809f91b2 100644 --- a/.idea/inspectionProfiles/Druid.xml +++ b/.idea/inspectionProfiles/Druid.xml @@ -311,6 +311,17 @@ + + + + + + + + + + + diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index c7696173ad58..a07bf41d8f63 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -266,6 +266,7 @@ protected List> createIndexTasks( @Override + @SuppressWarnings("SSBasedInspection") protected Map getLagPerPartition(Map currentOffsets) { return currentOffsets diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java index 281356c66caf..9f7336afc6c5 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java @@ -35,13 +35,13 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; import java.util.Comparator; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; public class ExpressionPostAggregator implements PostAggregator { @@ -187,12 +187,7 @@ public ExpressionPostAggregator decorate(final Map ag expression, ordering, macroTable, - aggregators.entrySet().stream().collect( - Collectors.toMap( - entry -> entry.getKey(), - entry -> entry.getValue()::finalizeComputation - ) - ), + CollectionUtils.mapValues(aggregators, v -> o -> v.finalizeComputation(o)), parsed, dependentFields ); From 62349d69855178ff068db945ddbc31ab101dc551 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sun, 14 Jul 2019 23:46:21 -0700 Subject: [PATCH 3/5] refactoring from PR comments --- .idea/inspectionProfiles/Druid.xml | 11 +++++++++++ .../org/apache/druid/utils/CollectionUtils.java | 15 +++++++++++++-- ...IncrementalPublishingKafkaIndexTaskRunner.java | 6 +++--- .../kafka/supervisor/KafkaSupervisor.java | 1 + .../druid/indexing/common/task/IndexTask.java | 2 +- .../apache/druid/indexing/overlord/TaskQueue.java | 4 ++-- .../SeekableStreamIndexTaskRunner.java | 2 +- .../post/ExpressionPostAggregator.java | 2 +- .../druid/metadata/SQLMetadataSegmentManager.java | 2 +- .../appenderator/BaseAppenderatorDriver.java | 2 +- .../org/apache/druid/server/SegmentManager.java | 4 ++-- .../druid/server/coordinator/DruidCluster.java | 4 ++-- .../server/coordinator/cost/ClusterCostCache.java | 2 +- .../server/coordinator/cost/ServerCostCache.java | 2 +- 14 files changed, 41 insertions(+), 18 deletions(-) diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml index 0ef0809f91b2..1f87af694767 100644 --- a/.idea/inspectionProfiles/Druid.xml +++ b/.idea/inspectionProfiles/Druid.xml @@ -322,6 +322,17 @@ + + + + + + + + + + + diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index 364661bf5ef6..356b04d9b436 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -20,6 +20,7 @@ package org.apache.druid.utils; import com.google.common.collect.Maps; +import org.apache.druid.java.util.common.ISE; import java.util.AbstractCollection; import java.util.Collection; @@ -88,12 +89,22 @@ public static Map mapValues(Map map, Function val /** * Returns a transformed map from the given input map where the key is modified based on the given keyMapper - * function. + * function. This method fails if keys collide after applying the given keyMapper function and + * throws a IllegalStateException. + * + * @throws ISE if key collisions occur while applying specified keyMapper */ + public static Map mapKeys(Map map, Function keyMapper) { final Map result = Maps.newHashMapWithExpectedSize(map.size()); - map.forEach((k, v) -> result.put(keyMapper.apply(k), v)); + map.forEach((k, v) -> { + final K2 k2 = keyMapper.apply(k); + if (result.containsKey(k2)) { + throw new ISE("Conflicting key[%s] calculated via keyMapper for original key[%s]", k2, k); + } + result.put(k2, v); + }); return result; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 46570aada5ca..3b34454c8526 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -163,9 +163,9 @@ private void possiblyResetOffsetsOrWait( } if (doReset) { - sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, k -> StreamPartition.of( - k.topic(), - k.partition() + sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, streamPartition -> StreamPartition.of( + streamPartition.topic(), + streamPartition.partition() )), taskToolbox); } else { log.warn("Retrying in %dms", task.getPollRetryMs()); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index a07bf41d8f63..e9c7d2a2558e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -266,6 +266,7 @@ protected List> createIndexTasks( @Override + // suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here @SuppressWarnings("SSBasedInspection") protected Map getLagPerPartition(Map currentOffsets) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index f975db6d6248..07a507af140d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -445,7 +445,7 @@ public TaskStatus run(final TaskToolbox toolbox) toolbox.getTaskActionClient(), intervals ); - versions = CollectionUtils.mapValues(locks, v -> v.getVersion()); + versions = CollectionUtils.mapValues(locks, TaskLock::getVersion); dataSchema = ingestionSchema.getDataSchema().withGranularitySpec( ingestionSchema.getDataSchema() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 0fcc24acdbcf..ee3a56d8b453 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -622,7 +622,7 @@ private Map getDeltaValues(Map total, Map getSuccessfulTaskCount() { - Map total = CollectionUtils.mapValues(totalSuccessfulTaskCount, v -> v.get()); + Map total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get); Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); prevTotalSuccessfulTaskCount = total; return delta; @@ -630,7 +630,7 @@ public Map getSuccessfulTaskCount() public Map getFailedTaskCount() { - Map total = CollectionUtils.mapValues(totalFailedTaskCount, v -> v.get()); + Map total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get); Map delta = getDeltaValues(total, prevTotalFailedTaskCount); prevTotalFailedTaskCount = total; return delta; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index a40eef0ab0fc..a9e894112d17 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1277,7 +1277,7 @@ protected void sendResetRequestAndWait( { Map partitionOffsetMap = CollectionUtils.mapKeys( outOfRangePartitions, - k -> k.getPartitionId() + StreamPartition::getPartitionId ); boolean result = taskToolbox diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java index 9f7336afc6c5..cfd6578b1ae1 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java @@ -187,7 +187,7 @@ public ExpressionPostAggregator decorate(final Map ag expression, ordering, macroTable, - CollectionUtils.mapValues(aggregators, v -> o -> v.finalizeComputation(o)), + CollectionUtils.mapValues(aggregators, aggregatorFactory -> obj -> aggregatorFactory.finalizeComputation(obj)), parsed, dependentFields ); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 5a8e3487ae4b..755f4c211ea2 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -723,7 +723,7 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE // https://github.com/apache/incubator-druid/pull/7653 are in. final Map updatedDataSources = CollectionUtils.mapValues( newDataSources, - v -> v.toImmutableDruidDataSource() + DruidDataSource::toImmutableDruidDataSource ); dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 85e3ed93e3c4..9ef242097bbc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -690,7 +690,7 @@ WrappedCommitter wrapCommitter(final Committer committer) ) ) ), - CollectionUtils.mapValues(snapshot, v -> v.lastSegmentId), + CollectionUtils.mapValues(snapshot, segmentsForSequence -> segmentsForSequence.lastSegmentId), committer.getMetadata() ); diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 3a73a2b8246a..529a37f68342 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -114,7 +114,7 @@ Map getDataSources() */ public Map getDataSourceSizes() { - return CollectionUtils.mapValues(dataSources, v -> v.getTotalSegmentSize()); + return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize); } /** @@ -125,7 +125,7 @@ public Map getDataSourceSizes() */ public Map getDataSourceCounts() { - return CollectionUtils.mapValues(dataSources, v -> v.getNumSegments()); + return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments); } public boolean isSegmentCached(final DataSegment segment) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java index ab804a1c2bfb..991117e04e94 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java @@ -62,8 +62,8 @@ public DruidCluster( this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes); this.historicals = CollectionUtils.mapValues( historicals, - v -> StreamSupport - .stream(v.spliterator(), false) + holders -> StreamSupport + .stream(holders.spliterator(), false) .collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java index 4354d8d2cd72..84bd317180aa 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java @@ -82,7 +82,7 @@ public void removeServer(String serverName) public ClusterCostCache build() { return new ClusterCostCache( - CollectionUtils.mapValues(serversCostCache, v -> v.build()) + CollectionUtils.mapValues(serversCostCache, ServerCostCache.Builder::build) ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java index 166e138cd2ca..7897569197bc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java @@ -89,7 +89,7 @@ public ServerCostCache build() { return new ServerCostCache( allSegmentsCostCache.build(), - CollectionUtils.mapValues(segmentsPerDataSource, v -> v.build()) + CollectionUtils.mapValues(segmentsPerDataSource, SegmentsCostCache.Builder::build) ); } } From 058198b9215ec414e551dd53334a7fd91e2c10e4 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 15 Jul 2019 10:31:55 -0700 Subject: [PATCH 4/5] put -> putIfAbsent --- core/src/main/java/org/apache/druid/utils/CollectionUtils.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index 356b04d9b436..4086b5ccb2d1 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -94,7 +94,6 @@ public static Map mapValues(Map map, Function val * * @throws ISE if key collisions occur while applying specified keyMapper */ - public static Map mapKeys(Map map, Function keyMapper) { final Map result = Maps.newHashMapWithExpectedSize(map.size()); @@ -103,7 +102,7 @@ public static Map mapKeys(Map map, Function keyMa if (result.containsKey(k2)) { throw new ISE("Conflicting key[%s] calculated via keyMapper for original key[%s]", k2, k); } - result.put(k2, v); + result.putIfAbsent(k2, v); }); return result; } From 00f5a5ea30bdccb0a2347ceae0d08e5c4500e059 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 16 Jul 2019 09:33:38 -0700 Subject: [PATCH 5/5] do single key lookup --- core/src/main/java/org/apache/druid/utils/CollectionUtils.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index 4086b5ccb2d1..4849ae494a07 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -99,10 +99,9 @@ public static Map mapKeys(Map map, Function keyMa final Map result = Maps.newHashMapWithExpectedSize(map.size()); map.forEach((k, v) -> { final K2 k2 = keyMapper.apply(k); - if (result.containsKey(k2)) { + if (result.putIfAbsent(k2, v) != null) { throw new ISE("Conflicting key[%s] calculated via keyMapper for original key[%s]", k2, k); } - result.putIfAbsent(k2, v); }); return result; }