diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml index bba3782b8547..bac71e735086 100644 --- a/.idea/inspectionProfiles/Druid.xml +++ b/.idea/inspectionProfiles/Druid.xml @@ -311,6 +311,28 @@ + + + + + + + + + + + + + + + + + + + + + + diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index deb4eafd0466..530feb855b60 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -21,10 +21,12 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import org.apache.druid.java.util.common.ISE; import java.util.AbstractCollection; import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Spliterator; @@ -85,6 +87,8 @@ public static TreeSet newTreeSet(Comparator comparator, Iterab /** * Returns a transformed map from the given input map where the value is modified based on the given valueMapper * function. + * Unlike {@link Maps#transformValues}, this method applies the mapping function eagerly to all key-value pairs + * in the source map and returns a new {@link HashMap}, while {@link Maps#transformValues} returns a lazy map view. */ public static Map mapValues(Map map, Function valueMapper) { @@ -93,6 +97,25 @@ public static Map mapValues(Map map, Function val return result; } + /** + * Returns a transformed map from the given input map where the key is modified based on the given keyMapper + * function. This method fails if keys collide after applying the given keyMapper function and + * throws a IllegalStateException. + * + * @throws ISE if key collisions occur while applying specified keyMapper + */ + public static Map mapKeys(Map map, Function keyMapper) + { + final Map result = Maps.newHashMapWithExpectedSize(map.size()); + map.forEach((k, v) -> { + final K2 k2 = keyMapper.apply(k); + if (result.putIfAbsent(k2, v) != null) { + throw new ISE("Conflicting key[%s] calculated via keyMapper for original key[%s]", k2, k); + } + }); + return result; + } + private CollectionUtils() { } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index bf0580cd7ad0..3b34454c8526 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -39,6 +39,7 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.utils.CircularBuffer; +import org.apache.druid.utils.CollectionUtils; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.TopicPartition; @@ -54,7 +55,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Kafka indexing task runner supporting incremental segments publishing @@ -163,12 +163,10 @@ private void possiblyResetOffsetsOrWait( } if (doReset) { - sendResetRequestAndWait(resetPartitions.entrySet() - .stream() - .collect(Collectors.toMap(x -> StreamPartition.of( - x.getKey().topic(), - x.getKey().partition() - ), Map.Entry::getValue)), taskToolbox); + sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, streamPartition -> StreamPartition.of( + streamPartition.topic(), + streamPartition.partition() + )), taskToolbox); } else { log.warn("Retrying in %dms", task.getPollRetryMs()); pollRetryLock.lockInterruptibly(); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index c7696173ad58..e9c7d2a2558e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -266,6 +266,8 @@ protected List> createIndexTasks( @Override + // suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here + @SuppressWarnings("SSBasedInspection") protected Map getLagPerPartition(Map currentOffsets) { return currentOffsets diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index b820b23012dd..ee3a56d8b453 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -45,6 +45,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.EntryExistsException; +import org.apache.druid.utils.CollectionUtils; import java.util.ArrayList; import java.util.Collection; @@ -621,12 +622,7 @@ private Map getDeltaValues(Map total, Map getSuccessfulTaskCount() { - Map total = totalSuccessfulTaskCount.entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> e.getValue().get() - )); + Map total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get); Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); prevTotalSuccessfulTaskCount = total; return delta; @@ -634,12 +630,7 @@ public Map getSuccessfulTaskCount() public Map getFailedTaskCount() { - Map total = totalFailedTaskCount.entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> e.getValue().get() - )); + Map total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get); Map delta = getDeltaValues(total, prevTotalFailedTaskCount); prevTotalFailedTaskCount = total; return delta; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index f7de882fda15..a9e894112d17 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -80,6 +80,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CircularBuffer; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -1274,8 +1275,10 @@ protected void sendResetRequestAndWait( ) throws IOException { - Map partitionOffsetMap = outOfRangePartitions - .entrySet().stream().collect(Collectors.toMap(x -> x.getKey().getPartitionId(), Map.Entry::getValue)); + Map partitionOffsetMap = CollectionUtils.mapKeys( + outOfRangePartitions, + StreamPartition::getPartitionId + ); boolean result = taskToolbox .getTaskActionClient() diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java index 281356c66caf..cfd6578b1ae1 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java @@ -35,13 +35,13 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; import java.util.Comparator; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; public class ExpressionPostAggregator implements PostAggregator { @@ -187,12 +187,7 @@ public ExpressionPostAggregator decorate(final Map ag expression, ordering, macroTable, - aggregators.entrySet().stream().collect( - Collectors.toMap( - entry -> entry.getKey(), - entry -> entry.getValue()::finalizeComputation - ) - ), + CollectionUtils.mapValues(aggregators, aggregatorFactory -> obj -> aggregatorFactory.finalizeComputation(obj)), parsed, dependentFields ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index bfabeed7d772..9ef242097bbc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -43,6 +43,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -689,14 +690,7 @@ WrappedCommitter wrapCommitter(final Committer committer) ) ) ), - snapshot.entrySet() - .stream() - .collect( - Collectors.toMap( - Entry::getKey, - e -> e.getValue().lastSegmentId - ) - ), + CollectionUtils.mapValues(snapshot, segmentsForSequence -> segmentsForSequence.lastSegmentId), committer.getMetadata() ); diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index ecbb0d496861..529a37f68342 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -32,12 +32,11 @@ import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; /** * This class is responsible for managing data sources and their states like timeline, total segment size, and number of @@ -115,8 +114,7 @@ Map getDataSources() */ public Map getDataSourceSizes() { - return dataSources.entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getTotalSegmentSize())); + return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize); } /** @@ -127,8 +125,7 @@ public Map getDataSourceSizes() */ public Map getDataSourceCounts() { - return dataSources.entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getNumSegments())); + return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments); } public boolean isSegmentCached(final DataSegment segment) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java index 2001b3fa4e7f..6f52bf8d971c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java @@ -36,7 +36,6 @@ import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; -import java.util.stream.Collectors; /** * Contains a representation of the current state of the cluster by tier. @@ -69,16 +68,13 @@ private DruidCluster( ) { this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes); - this.historicals = historicals - .entrySet() - .stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - (Map.Entry> e) -> - CollectionUtils.newTreeSet(Comparator.reverseOrder(), e.getValue()) - ) - ); + this.historicals = CollectionUtils.mapValues( + historicals, + holders -> CollectionUtils.newTreeSet( + Comparator.reverseOrder(), + holders + ) + ); } public void add(ServerHolder serverHolder) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java index 63bf3c831933..84bd317180aa 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java @@ -21,11 +21,11 @@ import com.google.common.base.Preconditions; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; public class ClusterCostCache { @@ -82,10 +82,7 @@ public void removeServer(String serverName) public ClusterCostCache build() { return new ClusterCostCache( - serversCostCache - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())) + CollectionUtils.mapValues(serversCostCache, ServerCostCache.Builder::build) ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java index 1310b472e7ed..7897569197bc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java @@ -21,10 +21,10 @@ import com.google.common.base.Preconditions; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; public class ServerCostCache { @@ -89,10 +89,7 @@ public ServerCostCache build() { return new ServerCostCache( allSegmentsCostCache.build(), - segmentsPerDataSource - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())) + CollectionUtils.mapValues(segmentsPerDataSource, SegmentsCostCache.Builder::build) ); } }