From a6191125d3acdc662cee70518c43761c7e82a595 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sat, 6 Jul 2019 22:57:21 +0530 Subject: [PATCH 01/29] #7641 - Changing segment distribution algorithm to distribute segments to multiple segment cache locations --- .../SegmentLoaderLocalCacheManager.java | 23 +++- .../SegmentLoaderLocalCacheManagerTest.java | 118 ++++++++++++++++++ 2 files changed, 135 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 327db5d2cdd0..68e6c8da84c5 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; import com.google.common.primitives.Longs; import com.google.inject.Inject; import org.apache.commons.io.FileUtils; @@ -35,6 +36,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -51,6 +53,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader private final ObjectMapper jsonMapper; private final List locations; + private Iterator cyclicIterator; // This directoryWriteRemoveLock is used when creating or removing a directory private final Object directoryWriteRemoveLock = new Object(); @@ -102,6 +105,8 @@ public SegmentLoaderLocalCacheManager( ); } locations.sort(COMPARATOR); + // cyclicIterator remembers the marker internally + cyclicIterator = Iterators.cycle(locations); } @Override @@ -179,7 +184,13 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException */ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { - for (StorageLocation loc : locations) { + int numLocationsToTry = locations.size(); + + while (cyclicIterator.hasNext() && numLocationsToTry > 0) { + + StorageLocation loc = cyclicIterator.next(); + numLocationsToTry--; + if (loc.canHandle(segment)) { File storageDir = new File(loc.getPath(), storageDirStr); @@ -189,12 +200,12 @@ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storage } catch (SegmentLoadingException e) { log.makeAlert( - e, - "Failed to load segment in current location %s, try next location if any", - loc.getPath().getAbsolutePath() + e, + "Failed to load segment in current location %s, try next location if any", + loc.getPath().getAbsolutePath() ) - .addData("location", loc.getPath().getAbsolutePath()) - .emit(); + .addData("location", loc.getPath().getAbsolutePath()) + .emit(); cleanupCacheFiles(loc.getPath(), storageDir); } diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index 5a54f8ad4d62..6748af258fd9 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -410,4 +410,122 @@ private DataSegment dataSegmentWithInterval(String intervalStr) .size(10L) .build(); } + + @Test + public void testSegmentDistributionToMultipleLocations() throws Exception{ + + final List locations = new ArrayList<>(); + final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, true); + final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L, true); + final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 1000000000L, true); + locations.add(locationConfig); + locations.add(locationConfig2); + locations.add(locationConfig3); + + manager = new SegmentLoaderLocalCacheManager( + TestHelper.getTestIndexIO(), + new SegmentLoaderConfig().withLocations(locations), + jsonMapper + ); + final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder"); + + // Segment 1 should be downloaded in local_storage_folder + final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); + + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + + File segmentFile = manager.getSegmentFiles(segmentToDownload); + Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + + manager.cleanup(segmentToDownload); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload)); + + // Segment 2 should be downloaded in local_storage_folder2 + final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); + + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + + File segmentFile2 = manager.getSegmentFiles(segmentToDownload2); + Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + + manager.cleanup(segmentToDownload2); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2)); + + // Segment 3 should be downloaded in local_storage_folder3 + final DataSegment segmentToDownload3 = dataSegmentWithInterval("2014-12-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); + + File segmentFile3 = manager.getSegmentFiles(segmentToDownload3); + Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3)); + + manager.cleanup(segmentToDownload3); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3)); + + // Segment 1 should be downloaded in local_storage_folder again, asserting round robin distribution of segments + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + + File segmentFile1 = manager.getSegmentFiles(segmentToDownload); + Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + manager.cleanup(segmentToDownload); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload)); + } + + private void createLocalSegmentFile(File segmentSrcFolder, String localSegmentPath) throws Exception + { + // manually create a local segment under segmentSrcFolder + final File localSegmentFile = new File(segmentSrcFolder, localSegmentPath); + localSegmentFile.mkdirs(); + final File indexZip = new File(localSegmentFile, "index.zip"); + indexZip.createNewFile(); + } + + private StorageLocationConfig createStorageLocationConfig(String localPath, long maxSize, boolean writable) throws Exception + { + final StorageLocationConfig locationConfig = new StorageLocationConfig(); + final File localStorageFolder = tmpFolder.newFolder(localPath); + localStorageFolder.setWritable(writable); + locationConfig.setPath(localStorageFolder); + locationConfig.setMaxSize(maxSize); + return locationConfig; + } + } From 08b625605843e6cfb2f92a03a725983e11fa7864 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sat, 6 Jul 2019 23:12:30 +0530 Subject: [PATCH 02/29] Fixing indentation --- .../loading/SegmentLoaderLocalCacheManagerTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index 6748af258fd9..5743636022e0 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -412,8 +412,8 @@ private DataSegment dataSegmentWithInterval(String intervalStr) } @Test - public void testSegmentDistributionToMultipleLocations() throws Exception{ - + public void testSegmentDistributionToMultipleLocations() throws Exception + { final List locations = new ArrayList<>(); final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, true); final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L, true); @@ -431,7 +431,7 @@ public void testSegmentDistributionToMultipleLocations() throws Exception{ // Segment 1 should be downloaded in local_storage_folder final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec( - ImmutableMap.of( + ImmutableMap.of( "type", "local", "path", @@ -455,7 +455,7 @@ public void testSegmentDistributionToMultipleLocations() throws Exception{ // Segment 2 should be downloaded in local_storage_folder2 final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec( - ImmutableMap.of( + ImmutableMap.of( "type", "local", "path", @@ -479,7 +479,7 @@ public void testSegmentDistributionToMultipleLocations() throws Exception{ // Segment 3 should be downloaded in local_storage_folder3 final DataSegment segmentToDownload3 = dataSegmentWithInterval("2014-12-20T00:00:00Z/P1D").withLoadSpec( - ImmutableMap.of( + ImmutableMap.of( "type", "local", "path", From c2bd858ba0affbe3ebf62c7de025a0b2e27ca480 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Thu, 11 Jul 2019 21:53:19 +0530 Subject: [PATCH 03/29] WIP --- ...esUsedStorageLocationSelectorStrategy.java | 32 ++++++++++++ ...dRobinStorageLocationSelectorStrategy.java | 52 +++++++++++++++++++ .../segment/loading/SegmentLoaderConfig.java | 9 ++++ .../SegmentLoaderLocalCacheManager.java | 41 +++++++-------- .../StorageLocationSelectorStrategy.java | 39 ++++++++++++++ 5 files changed, 151 insertions(+), 22 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java create mode 100644 server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java create mode 100644 server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java diff --git a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java new file mode 100644 index 000000000000..3d3d8c547cab --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import org.apache.druid.timeline.DataSegment; + +import java.util.List; + +public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy +{ + @Override + public StorageLocation select(DataSegment dataSegment, List storageLocationsAvailable) { + return null; + } +} diff --git a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java new file mode 100644 index 000000000000..a10b4f0c78c0 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import org.apache.druid.timeline.DataSegment; + +import java.util.List; + +/** + * + */ +public class RoundRobinStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy +{ + @Override + public StorageLocation select(DataSegment dataSegment, List storageLocations) + { + StorageLocation bestLocation = null; + + int numLocationsToTry = storageLocations.size(); + + while (cyclicIterator.hasNext() && numLocationsToTry > 0) { + + StorageLocation loc = cyclicIterator.next(); + + numLocationsToTry--; + + if (loc.canHandle(dataSegment)) { + bestLocation = loc; + break; + } + } + + return bestLocation; + } +} diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 80f0fbc9fdf4..a1cd03d2efd7 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; +import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.ISE; import org.apache.druid.utils.JvmUtils; import org.hibernate.validator.constraints.NotEmpty; @@ -52,6 +53,9 @@ public class SegmentLoaderConfig @JsonProperty("numBootstrapThreads") private Integer numBootstrapThreads = null; + @JsonProperty("locationSelectorStrategy") + private StorageLocationSelectorStrategy locationSelectorStrategy = new RoundRobinStorageLocationSelectorStrategy(); // default strategy if no strategy is specified in the config + @JsonProperty private File infoDir = null; @@ -88,6 +92,11 @@ public int getNumBootstrapThreads() return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads; } + public StorageLocationSelectorStrategy getStorageLocationSelectorStrategy() + { + return locationSelectorStrategy; + } + public File getInfoDir() { if (infoDir == null) { diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 68e6c8da84c5..e6e345203c7f 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -184,33 +184,30 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException */ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { - int numLocationsToTry = locations.size(); + StorageLocationSelectorStrategy strategy = config.getStorageLocationSelectorStrategy(); - while (cyclicIterator.hasNext() && numLocationsToTry > 0) { + StorageLocation selectedLoc = strategy.select(segment, locations); - StorageLocation loc = cyclicIterator.next(); - numLocationsToTry--; + if (null != selectedLoc) { + File storageDir = new File(selectedLoc.getPath(), storageDirStr); - if (loc.canHandle(segment)) { - File storageDir = new File(loc.getPath(), storageDirStr); - - try { - loadInLocationWithStartMarker(segment, storageDir); - return loc; - } - catch (SegmentLoadingException e) { - log.makeAlert( - e, - "Failed to load segment in current location %s, try next location if any", - loc.getPath().getAbsolutePath() - ) - .addData("location", loc.getPath().getAbsolutePath()) - .emit(); - - cleanupCacheFiles(loc.getPath(), storageDir); - } + try { + loadInLocationWithStartMarker(segment, storageDir); + return selectedLoc; + } + catch (SegmentLoadingException e) { + log.makeAlert( + e, + "Failed to load segment in current location %s, try next location if any", + selectedLoc.getPath().getAbsolutePath() + ) + .addData("location", selectedLoc.getPath().getAbsolutePath()) + .emit(); + + cleanupCacheFiles(selectedLoc.getPath(), storageDir); } } + throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getId()); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java new file mode 100644 index 000000000000..bf91472c53e6 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import org.apache.druid.timeline.DataSegment; + +import java.util.List; + +/** + * + */ +public interface StorageLocationSelectorStrategy +{ + /** + * + * Find the best {@link StorageLocation} to load the given {@link DataSegment} into according to the location selector strategy. + * + * @param storageLocations list of available locations from which a location needs to be picked by the strategy. + * @return The storage location to load the given segment into or null if no location has the capacity to store the given segment. + */ + StorageLocation select(DataSegment dataSegment, List storageLocations); +} From f9fa66f5369dc1907fbb0c42c4d064b8d21b982a Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Thu, 25 Jul 2019 22:23:51 +0530 Subject: [PATCH 04/29] Adding interface for location strategy selection, least bytes used strategy impl, round-robin strategy impl, locationSelectorStrategy config with least bytes used strategy as the default strategy --- ...esUsedStorageLocationSelectorStrategy.java | 37 ++++++++++++++++++- ...dRobinStorageLocationSelectorStrategy.java | 22 +++++++++-- .../segment/loading/SegmentLoaderConfig.java | 3 +- .../SegmentLoaderLocalCacheManager.java | 18 ++++----- .../StorageLocationSelectorStrategy.java | 21 +++++++++-- 5 files changed, 79 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java index 3d3d8c547cab..4b550db9c072 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java @@ -19,14 +19,47 @@ package org.apache.druid.segment.loading; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Longs; import org.apache.druid.timeline.DataSegment; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; +/** + * A {@link StorageLocation} selector strategy that selects a segment cache location that is least filled each time + * among the available storage locations. + */ public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy { + private static final Comparator COMPARATOR = (left, right) -> + Longs.compare(right.available(), left.available()); + + private ImmutableList storageLocations; + + @Override + public void setStorageLocations(ImmutableList storageLocations) { + this.storageLocations = storageLocations; + } + @Override - public StorageLocation select(DataSegment dataSegment, List storageLocationsAvailable) { - return null; + public StorageLocation select(DataSegment dataSegment) { + + StorageLocation bestLocation = null; + + List locations = Ordering.from(COMPARATOR).sortedCopy(this.storageLocations); + + Iterator locIterator = locations.iterator(); + + while(locIterator.hasNext()) { + StorageLocation location = locIterator.next(); + if(location.canHandle(dataSegment)){ + bestLocation = location; + break; + } + } + return bestLocation; } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java index a10b4f0c78c0..5b82b2c6bf2e 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java @@ -19,18 +19,32 @@ package org.apache.druid.segment.loading; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import org.apache.druid.timeline.DataSegment; -import java.util.List; +import java.util.Iterator; /** - * + * A {@link StorageLocation} selector strategy that selects a segment cache location in a round-robin fashion each time + * among the available storage locations. */ public class RoundRobinStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy { + + private ImmutableList storageLocations; + private Iterator cyclicIterator; + @Override - public StorageLocation select(DataSegment dataSegment, List storageLocations) - { + public void setStorageLocations(ImmutableList storageLocations) { + this.storageLocations = storageLocations; + // cyclicIterator remembers the marker internally + cyclicIterator = Iterators.cycle(storageLocations); + } + + @Override + public StorageLocation select(DataSegment dataSegment) { + StorageLocation bestLocation = null; int numLocationsToTry = storageLocations.size(); diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index a1cd03d2efd7..1802b4ab21de 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -54,7 +54,8 @@ public class SegmentLoaderConfig private Integer numBootstrapThreads = null; @JsonProperty("locationSelectorStrategy") - private StorageLocationSelectorStrategy locationSelectorStrategy = new RoundRobinStorageLocationSelectorStrategy(); // default strategy if no strategy is specified in the config + private StorageLocationSelectorStrategy locationSelectorStrategy = new LeastBytesUsedStorageLocationSelectorStrategy() + ; // default strategy if no strategy is specified in the config @JsonProperty private File infoDir = null; diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 45c899c0d153..9f37a46d1a63 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterators; -import com.google.common.primitives.Longs; +import com.google.common.collect.ImmutableList; + import com.google.inject.Inject; import org.apache.commons.io.FileUtils; import org.apache.druid.guice.annotations.Json; @@ -35,7 +35,6 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -45,8 +44,6 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); - private static final Comparator COMPARATOR = (left, right) -> - Longs.compare(right.available(), left.available()); private final IndexIO indexIO; private final SegmentLoaderConfig config; @@ -80,6 +77,8 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader */ private final ConcurrentHashMap segmentLocks = new ConcurrentHashMap<>(); + private final StorageLocationSelectorStrategy strategy; + // Note that we only create this via injection in historical and realtime nodes. Peons create these // objects via SegmentLoaderFactory objects, so that they can store segments in task-specific // directories rather than statically configured directories. @@ -104,9 +103,8 @@ public SegmentLoaderLocalCacheManager( ) ); } - locations.sort(COMPARATOR); - // cyclicIterator remembers the marker internally - cyclicIterator = Iterators.cycle(locations); + this.strategy = config.getStorageLocationSelectorStrategy(); + this.strategy.setStorageLocations(ImmutableList.copyOf(locations)); } @Override @@ -185,9 +183,7 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException */ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { - StorageLocationSelectorStrategy strategy = config.getStorageLocationSelectorStrategy(); - - StorageLocation selectedLoc = strategy.select(segment, locations); + StorageLocation selectedLoc = strategy.select(segment); if (null != selectedLoc) { File storageDir = new File(selectedLoc.getPath(), storageDirStr); diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java index bf91472c53e6..26ad62e18a55 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -19,21 +19,34 @@ package org.apache.druid.segment.loading; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.collect.ImmutableList; import org.apache.druid.timeline.DataSegment; import java.util.List; /** - * + * Interface which represents a strategy to select a {@link StorageLocation} from available segment cache locations + * for segment distribution. */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "tier", defaultImpl = LeastBytesUsedStorageLocationSelectorStrategy.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "leastBytesUsed", value = LeastBytesUsedStorageLocationSelectorStrategy.class), + @JsonSubTypes.Type(name = "roundRobin", value = RoundRobinStorageLocationSelectorStrategy.class) +}) public interface StorageLocationSelectorStrategy { /** - * * Find the best {@link StorageLocation} to load the given {@link DataSegment} into according to the location selector strategy. * - * @param storageLocations list of available locations from which a location needs to be picked by the strategy. * @return The storage location to load the given segment into or null if no location has the capacity to store the given segment. */ - StorageLocation select(DataSegment dataSegment, List storageLocations); + StorageLocation select(DataSegment dataSegment); + + /** + * Sets the storage locations list with the supplied storage locations. + * @param storageLocations storage locations list to be used. + */ + void setStorageLocations(ImmutableList storageLocations); } From fd0c6d620921d9c1729427a328111f9499257042 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sun, 28 Jul 2019 17:07:14 +0530 Subject: [PATCH 05/29] fixing code style --- .../LeastBytesUsedStorageLocationSelectorStrategy.java | 5 ++--- .../loading/RoundRobinStorageLocationSelectorStrategy.java | 6 ++++-- .../apache/druid/segment/loading/SegmentLoaderConfig.java | 3 +-- .../segment/loading/StorageLocationSelectorStrategy.java | 6 ++---- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java index b7e69a71e66a..459d05b8311c 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; -import com.google.common.primitives.Longs; import org.apache.druid.timeline.DataSegment; import java.util.Comparator; @@ -35,7 +34,7 @@ public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy { private static final Comparator COMPARATOR = Comparator - .comparingLong(StorageLocation::currSizeBytes); + .comparingLong(StorageLocation::currSizeBytes); private ImmutableList storageLocations; @@ -55,7 +54,7 @@ public StorageLocation select(DataSegment dataSegment, String storageDirStr) Iterator locIterator = locations.iterator(); - while(locIterator.hasNext()) { + while (locIterator.hasNext()) { StorageLocation location = locIterator.next(); if (null != location.reserve(storageDirStr, dataSegment)) { diff --git a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java index 31653a9be2d9..54f5e937e325 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java @@ -36,14 +36,16 @@ public class RoundRobinStorageLocationSelectorStrategy implements StorageLocatio private Iterator cyclicIterator; @Override - public void setStorageLocations(ImmutableList storageLocations) { + public void setStorageLocations(ImmutableList storageLocations) + { this.storageLocations = storageLocations; // cyclicIterator remembers the marker internally cyclicIterator = Iterators.cycle(storageLocations); } @Override - public StorageLocation select(DataSegment dataSegment, String storageDirStr) { + public StorageLocation select(DataSegment dataSegment, String storageDirStr) + { StorageLocation bestLocation = null; diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 1802b4ab21de..5175c4e64c09 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; -import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.ISE; import org.apache.druid.utils.JvmUtils; import org.hibernate.validator.constraints.NotEmpty; @@ -55,7 +54,7 @@ public class SegmentLoaderConfig @JsonProperty("locationSelectorStrategy") private StorageLocationSelectorStrategy locationSelectorStrategy = new LeastBytesUsedStorageLocationSelectorStrategy() - ; // default strategy if no strategy is specified in the config + ; // default strategy if no strategy is specified in the config @JsonProperty private File infoDir = null; diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java index 4fdde647d206..a31933197185 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -24,16 +24,14 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.timeline.DataSegment; -import java.util.List; - /** * Interface which represents a strategy to select a {@link StorageLocation} from available segment cache locations * for segment distribution. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "tier", defaultImpl = LeastBytesUsedStorageLocationSelectorStrategy.class) @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "leastBytesUsed", value = LeastBytesUsedStorageLocationSelectorStrategy.class), - @JsonSubTypes.Type(name = "roundRobin", value = RoundRobinStorageLocationSelectorStrategy.class) + @JsonSubTypes.Type(name = "leastBytesUsed", value = LeastBytesUsedStorageLocationSelectorStrategy.class), + @JsonSubTypes.Type(name = "roundRobin", value = RoundRobinStorageLocationSelectorStrategy.class) }) public interface StorageLocationSelectorStrategy { From 449e9293439352b81cc5ac5dae877c4cf0ed5eff Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sun, 28 Jul 2019 18:10:28 +0530 Subject: [PATCH 06/29] Fixing test --- .../apache/druid/segment/loading/SegmentLoaderConfig.java | 4 ++-- .../segment/loading/SegmentLoaderLocalCacheManagerTest.java | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 5175c4e64c09..df7f44a8c91e 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -53,8 +53,8 @@ public class SegmentLoaderConfig private Integer numBootstrapThreads = null; @JsonProperty("locationSelectorStrategy") - private StorageLocationSelectorStrategy locationSelectorStrategy = new LeastBytesUsedStorageLocationSelectorStrategy() - ; // default strategy if no strategy is specified in the config + private StorageLocationSelectorStrategy locationSelectorStrategy = + new LeastBytesUsedStorageLocationSelectorStrategy(); // default strategy if no strategy is specified in the config @JsonProperty private File infoDir = null; diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index 96328e28c142..cced8873e564 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -502,11 +502,10 @@ private void createLocalSegmentFile(File segmentSrcFolder, String localSegmentPa private StorageLocationConfig createStorageLocationConfig(String localPath, long maxSize, boolean writable) throws Exception { - final StorageLocationConfig locationConfig = new StorageLocationConfig(); + final File localStorageFolder = tmpFolder.newFolder(localPath); localStorageFolder.setWritable(writable); - locationConfig.setPath(localStorageFolder); - locationConfig.setMaxSize(maxSize); + final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, maxSize, 1.0); return locationConfig; } From 33db5069e823fec3fae0a3ba85bb1d0cfd4a4f94 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Tue, 30 Jul 2019 00:57:38 +0530 Subject: [PATCH 07/29] Adding a method visible only for testing, fixing tests --- .../segment/loading/SegmentLoaderConfig.java | 8 +++++ .../SegmentLoaderLocalCacheManager.java | 32 +++++++++---------- .../SegmentLoaderLocalCacheManagerTest.java | 6 ++-- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index df7f44a8c91e..4ea10e63e286 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.loading; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.druid.java.util.common.ISE; import org.apache.druid.utils.JvmUtils; @@ -124,6 +125,13 @@ public SegmentLoaderConfig withLocations(List locations) return retVal; } + @VisibleForTesting + SegmentLoaderConfig withStorageLocationSelectorStrategy(StorageLocationSelectorStrategy strategy) + { + this.locationSelectorStrategy = strategy; + return this; + } + @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 25430958d77f..be78da4c6364 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -34,7 +34,6 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -49,7 +48,6 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader private final ObjectMapper jsonMapper; private final List locations; - private Iterator cyclicIterator; // This directoryWriteRemoveLock is used when creating or removing a directory private final Object directoryWriteRemoveLock = new Object(); @@ -180,27 +178,27 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException */ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { - StorageLocation selectedLoc = strategy.select(segment, storageDirStr); + StorageLocation loc = strategy.select(segment, storageDirStr); - if (null != selectedLoc) { - File storageDir = new File(selectedLoc.getPath(), storageDirStr); + if (null != loc) { + File storageDir = new File(loc.getPath(), storageDirStr); try { loadInLocationWithStartMarker(segment, storageDir); - return selectedLoc; + return loc; } catch (SegmentLoadingException e) { - log.makeAlert( - e, - "Failed to load segment in current location %s, try next location if any", - selectedLoc.getPath().getAbsolutePath() - ) - .addData("location", selectedLoc.getPath().getAbsolutePath()) - .emit(); - } - finally { - selectedLoc.removeSegmentDir(storageDir, segment); - cleanupCacheFiles(selectedLoc.getPath(), storageDir); + try { + log.makeAlert( + e, + "Failed to load segment in current location %s, try next location if any", + loc.getPath().getAbsolutePath() + ).addData("location", loc.getPath().getAbsolutePath()).emit(); + } + finally { + loc.removeSegmentDir(storageDir, segment); + cleanupCacheFiles(loc.getPath(), storageDir); + } } } throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getId()); diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index cced8873e564..207571af7396 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -394,7 +394,7 @@ private DataSegment dataSegmentWithInterval(String intervalStr) } @Test - public void testSegmentDistributionToMultipleLocations() throws Exception + public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception { final List locations = new ArrayList<>(); final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, true); @@ -406,7 +406,9 @@ public void testSegmentDistributionToMultipleLocations() throws Exception manager = new SegmentLoaderLocalCacheManager( TestHelper.getTestIndexIO(), - new SegmentLoaderConfig().withLocations(locations), + new SegmentLoaderConfig().withLocations(locations).withStorageLocationSelectorStrategy( + new RoundRobinStorageLocationSelectorStrategy() + ), jsonMapper ); final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder"); From 2712aeb9994d9889141ae6bcfb5a998b7c80f58b Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Tue, 30 Jul 2019 18:26:32 +0530 Subject: [PATCH 08/29] 1. Changing the method contract to return an iterator of locations instead of a single best location. 2. Check style fixes --- ...esUsedStorageLocationSelectorStrategy.java | 25 ++--------- ...dRobinStorageLocationSelectorStrategy.java | 24 +---------- .../SegmentLoaderLocalCacheManager.java | 43 ++++++++++++------- .../StorageLocationSelectorStrategy.java | 16 ++++--- 4 files changed, 44 insertions(+), 64 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java index 459d05b8311c..ab6430068ab8 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java @@ -25,7 +25,6 @@ import java.util.Comparator; import java.util.Iterator; -import java.util.List; /** * A {@link StorageLocation} selector strategy that selects a segment cache location that is least filled each time @@ -39,30 +38,14 @@ public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLoc private ImmutableList storageLocations; @Override - public void setStorageLocations(ImmutableList storageLocations) + public Iterator getLocations(DataSegment dataSegment, String storageDirStr) { - this.storageLocations = storageLocations; + return Ordering.from(COMPARATOR).sortedCopy(this.storageLocations).iterator(); } @Override - public StorageLocation select(DataSegment dataSegment, String storageDirStr) + public void setStorageLocations(ImmutableList storageLocations) { - - StorageLocation bestLocation = null; - - List locations = Ordering.from(COMPARATOR).sortedCopy(this.storageLocations); - - Iterator locIterator = locations.iterator(); - - while (locIterator.hasNext()) { - StorageLocation location = locIterator.next(); - - if (null != location.reserve(storageDirStr, dataSegment)) { - bestLocation = location; - break; - } - - } - return bestLocation; + this.storageLocations = storageLocations; } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java index 54f5e937e325..2125e213c9dd 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java @@ -31,38 +31,18 @@ */ public class RoundRobinStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy { - - private ImmutableList storageLocations; private Iterator cyclicIterator; @Override public void setStorageLocations(ImmutableList storageLocations) { - this.storageLocations = storageLocations; // cyclicIterator remembers the marker internally cyclicIterator = Iterators.cycle(storageLocations); } @Override - public StorageLocation select(DataSegment dataSegment, String storageDirStr) + public Iterator getLocations(DataSegment dataSegment, String storageDirStr) { - - StorageLocation bestLocation = null; - - int numLocationsToTry = storageLocations.size(); - - while (cyclicIterator.hasNext() && numLocationsToTry > 0) { - - StorageLocation loc = cyclicIterator.next(); - - numLocationsToTry--; - - if (null != loc.reserve(storageDirStr, dataSegment)) { - bestLocation = loc; - break; - } - } - - return bestLocation; + return cyclicIterator; } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index be78da4c6364..b62dbae82855 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -34,6 +34,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -175,29 +176,39 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException * location may fail because of IO failure, most likely in two cases:

* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly

* 2. disk failure, druid can't read/write to this disk anymore + * + * Locations are fetched using {@link StorageLocationSelectorStrategy}. */ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { - StorageLocation loc = strategy.select(segment, storageDirStr); + Iterator locations = strategy.getLocations(segment, storageDirStr); + int numLocationsToTry = this.locations.size(); - if (null != loc) { - File storageDir = new File(loc.getPath(), storageDirStr); + while (locations.hasNext() && numLocationsToTry > 0) { - try { - loadInLocationWithStartMarker(segment, storageDir); - return loc; - } - catch (SegmentLoadingException e) { + StorageLocation loc = locations.next(); + numLocationsToTry--; // This is to avoid the cyclic iterator returned from Round Robin strategy to loop + // indefinitely. + + File storageDir = loc.reserve(storageDirStr, segment); + + if (null != loc) { try { - log.makeAlert( - e, - "Failed to load segment in current location %s, try next location if any", - loc.getPath().getAbsolutePath() - ).addData("location", loc.getPath().getAbsolutePath()).emit(); + loadInLocationWithStartMarker(segment, storageDir); + return loc; } - finally { - loc.removeSegmentDir(storageDir, segment); - cleanupCacheFiles(loc.getPath(), storageDir); + catch (SegmentLoadingException e) { + try { + log.makeAlert( + e, + "Failed to load segment in current location %s, try next location if any", + loc.getPath().getAbsolutePath() + ).addData("location", loc.getPath().getAbsolutePath()).emit(); + } + finally { + loc.removeSegmentDir(storageDir, segment); + cleanupCacheFiles(loc.getPath(), storageDir); + } } } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java index a31933197185..9b4d21183b58 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -24,9 +24,11 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.timeline.DataSegment; +import java.util.Iterator; + /** - * Interface which represents a strategy to select a {@link StorageLocation} from available segment cache locations - * for segment distribution. + * This interface describes the storage location selection strategy which is responsible for ordering the + * available multiple {@link StorageLocation}s for optimal segment distribution. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "tier", defaultImpl = LeastBytesUsedStorageLocationSelectorStrategy.class) @JsonSubTypes(value = { @@ -36,11 +38,15 @@ public interface StorageLocationSelectorStrategy { /** - * Find the best {@link StorageLocation} to load the given {@link DataSegment} into according to the location selector strategy. + * Finds the best ordering of the {@link StorageLocation}s to load the given {@link DataSegment} into according to + * the location selector strategy. This method returns an iterator instead of a single best location. The + * caller is responsible for iterating over the locations and calling {@link StorageLocation}'s reserve() method. + * This is because a single location may be problematic like failed disk or might become unwritable for whatever + * reasons. * - * @return The storage location to load the given segment into or null if no location has the capacity to store the given segment. + * @return An iterator of {@link StorageLocation}s from which the callers can iterate and pick a location. */ - StorageLocation select(DataSegment dataSegment, String storageDirStr); + Iterator getLocations(DataSegment dataSegment, String storageDirStr); /** * Sets the storage locations list with the supplied storage locations. From c1e85973d99e7ed7951afb2be1a0f0488f86ad7d Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Wed, 31 Jul 2019 00:54:59 +0530 Subject: [PATCH 09/29] fixing the conditional statement --- .../druid/segment/loading/SegmentLoaderLocalCacheManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index b62dbae82855..073b78043cc3 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -192,7 +192,7 @@ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storage File storageDir = loc.reserve(storageDirStr, segment); - if (null != loc) { + if (null != storageDir) { try { loadInLocationWithStartMarker(segment, storageDir); return loc; From 66f00eef15f642710e84ad01e10e7610c53e7b58 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sun, 4 Aug 2019 17:19:15 +0530 Subject: [PATCH 10/29] Added testSegmentDistributionUsingLeastBytesUsedStrategy, fixed testSegmentDistributionUsingRoundRobinStrategy --- .../SegmentLoaderLocalCacheManagerTest.java | 193 +++++++++++++++--- 1 file changed, 163 insertions(+), 30 deletions(-) diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index 207571af7396..43f2c09623df 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -372,25 +372,30 @@ public void testEmptyToFullOrder() throws Exception } private DataSegment dataSegmentWithInterval(String intervalStr) + { + return dataSegmentWithInterval(intervalStr, 10L); + } + + private DataSegment dataSegmentWithInterval(String intervalStr, long size) { return DataSegment.builder() - .dataSource("test_segment_loader") - .interval(Intervals.of(intervalStr)) - .loadSpec( - ImmutableMap.of( - "type", - "local", - "path", - "somewhere" - ) - ) - .version("2015-05-27T03:38:35.683Z") - .dimensions(ImmutableList.of()) - .metrics(ImmutableList.of()) - .shardSpec(NoneShardSpec.instance()) - .binaryVersion(9) - .size(10L) - .build(); + .dataSource("test_segment_loader") + .interval(Intervals.of(intervalStr)) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version("2015-05-27T03:38:35.683Z") + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(NoneShardSpec.instance()) + .binaryVersion(9) + .size(size) + .build(); } @Test @@ -414,7 +419,7 @@ public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder"); // Segment 1 should be downloaded in local_storage_folder - final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec( + final DataSegment segmentToDownload1 = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec( ImmutableMap.of( "type", "local", @@ -428,14 +433,14 @@ public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception // manually create a local segment under segmentSrcFolder createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload1)); - File segmentFile = manager.getSegmentFiles(segmentToDownload); + File segmentFile = manager.getSegmentFiles(segmentToDownload1); Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload1)); - manager.cleanup(segmentToDownload); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload)); + manager.cleanup(segmentToDownload1); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload1)); // Segment 2 should be downloaded in local_storage_folder2 final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec( @@ -474,7 +479,8 @@ public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception ) ); // manually create a local segment under segmentSrcFolder - createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); + createLocalSegmentFile(segmentSrcFolder, + "test_segment_loader/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); File segmentFile3 = manager.getSegmentFiles(segmentToDownload3); Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/")); @@ -483,14 +489,29 @@ public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception manager.cleanup(segmentToDownload3); Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload3)); - // Segment 1 should be downloaded in local_storage_folder again, asserting round robin distribution of segments - Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + // Segment 4 should be downloaded in local_storage_folder again, asserting round robin distribution of segments + final DataSegment segmentToDownload4 = dataSegmentWithInterval("2014-08-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" + + ".000Z/2015-05-27T03:38:35.683Z/0"); + + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4)); - File segmentFile1 = manager.getSegmentFiles(segmentToDownload); + File segmentFile1 = manager.getSegmentFiles(segmentToDownload4); Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder/")); - Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); - manager.cleanup(segmentToDownload); - Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload)); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4)); + manager.cleanup(segmentToDownload4); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload4)); } private void createLocalSegmentFile(File segmentSrcFolder, String localSegmentPath) throws Exception @@ -511,4 +532,116 @@ private StorageLocationConfig createStorageLocationConfig(String localPath, long return locationConfig; } + @Test + public void testSegmentDistributionUsingLeastBytesUsedStrategy() throws Exception + { + final List locations = new ArrayList<>(); + final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, + true); + final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L, + true); + final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 1000000000L, + true); + locations.add(locationConfig); + locations.add(locationConfig2); + locations.add(locationConfig3); + + manager = new SegmentLoaderLocalCacheManager( + TestHelper.getTestIndexIO(), + new SegmentLoaderConfig().withLocations(locations), + jsonMapper + ); + final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder"); + + // Segment 1 should be downloaded in local_storage_folder, segment1 size 10L + final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D", 10L).withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, + "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); + + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + + File segmentFile = manager.getSegmentFiles(segmentToDownload); + Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + + // Segment 2 should be downloaded in local_storage_folder2, segment2 size 5L + final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D", 5L).withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, + "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); + + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + + File segmentFile2 = manager.getSegmentFiles(segmentToDownload2); + Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + + + // Segment 3 should be downloaded in local_storage_folder3, segment3 size 20L + final DataSegment segmentToDownload3 = dataSegmentWithInterval("2014-12-20T00:00:00Z/P1D", 20L).withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, + "test_segment_loader/2014-12-20T00:00:00.000Z_2014-12-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); + + File segmentFile3 = manager.getSegmentFiles(segmentToDownload3); + Assert.assertTrue(segmentFile3.getAbsolutePath().contains("/local_storage_folder3/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload3)); + + // Now the storage locations local_storage_folder1, local_storage_folder2 and local_storage_folder3 have 10, 5 and + // 20 bytes occupied respectively. The default strategy should pick location2 (as it has least bytes used) for the + // next segment to be downloaded asserting the least bytes used distribution of segments. + final DataSegment segmentToDownload4 = dataSegmentWithInterval("2014-08-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-08-20T00:00:00.000Z_2014-08-21T00:00:00" + + ".000Z/2015-05-27T03:38:35.683Z/0"); + + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload4)); + + File segmentFile1 = manager.getSegmentFiles(segmentToDownload4); + Assert.assertTrue(segmentFile1.getAbsolutePath().contains("/local_storage_folder2/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload4)); + + } + } From d60ab71d622869d861c775d1cdc95a5bd50c8df0 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sun, 4 Aug 2019 23:02:12 +0530 Subject: [PATCH 11/29] to trigger CI build From f60e0c8693932879b23788e0fcf837e029e42146 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Mon, 5 Aug 2019 08:28:34 +0530 Subject: [PATCH 12/29] Add documentation for the selection strategy configuration --- docs/content/configuration/index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 0e9e6633ce27..2da5cc5f0ee6 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -1263,6 +1263,7 @@ These Historical configurations can be defined in the `historical/runtime.proper |Property|Description|Default| |--------|-----------|-------| |`druid.segmentCache.locations`|Segments assigned to a Historical process are first stored on the local file system (in a disk cache) and then served by the Historical process. These locations define where that local cache resides. This value cannot be NULL or EMPTY. Here is an example `druid.segmentCache.locations=[{"path": "/mnt/druidSegments", "maxSize": 10000, "freeSpacePercent": 1.0}]`. "freeSpacePercent" is optional, if provided then enforces that much of free disk partition space while storing segments. But, it depends on File.getTotalSpace() and File.getFreeSpace() methods, so enable if only if they work for your File System.| none | +|`druid.segmentCache.locationSelectorStrategy`|The strategy used to select a location from the configured `druid.segmentCache.locations` for optimal segment distribution. Possible values are `leastBytesUsed` or `roundRobin`. The leastBytesUsed strategy always selects a location which has least bytes used in absolute terms.The roundRobin strategy selects a location in a round robin fashion oblivious to the bytes used or the capacity. |leastBytesUsed| |`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a process is no longer serving a segment.|true| |`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)| |`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir| From 53c56bd465514c3d1ee5c1321f545427c0d4f893 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Mon, 5 Aug 2019 15:34:03 +0530 Subject: [PATCH 13/29] to re trigger CI build From e930cd760d6f0b967f3bd2190654eae414b0cc12 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sat, 24 Aug 2019 23:36:38 +0530 Subject: [PATCH 14/29] updated docs as per review comments, made LeastBytesUsedStorageLocationSelectorStrategy.getLocations a synchronzied method, other minor fixes --- docs/content/configuration/index.md | 5 +++- ...esUsedStorageLocationSelectorStrategy.java | 18 ++++++++++---- ...dRobinStorageLocationSelectorStrategy.java | 15 ++++++++---- .../segment/loading/SegmentLoaderConfig.java | 24 ++++++++++++------- .../SegmentLoaderLocalCacheManager.java | 13 ++++------ .../StorageLocationSelectorStrategy.java | 19 +++++---------- .../SegmentLoaderLocalCacheManagerTest.java | 23 +++++++++++++----- 7 files changed, 72 insertions(+), 45 deletions(-) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 19514874f4a6..90a6938f497e 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -1265,7 +1265,7 @@ These Historical configurations can be defined in the `historical/runtime.proper |Property|Description|Default| |--------|-----------|-------| |`druid.segmentCache.locations`|Segments assigned to a Historical process are first stored on the local file system (in a disk cache) and then served by the Historical process. These locations define where that local cache resides. This value cannot be NULL or EMPTY. Here is an example `druid.segmentCache.locations=[{"path": "/mnt/druidSegments", "maxSize": 10000, "freeSpacePercent": 1.0}]`. "freeSpacePercent" is optional, if provided then enforces that much of free disk partition space while storing segments. But, it depends on File.getTotalSpace() and File.getFreeSpace() methods, so enable if only if they work for your File System.| none | -|`druid.segmentCache.locationSelectorStrategy`|The strategy used to select a location from the configured `druid.segmentCache.locations` for optimal segment distribution. Possible values are `leastBytesUsed` or `roundRobin`. The leastBytesUsed strategy always selects a location which has least bytes used in absolute terms.The roundRobin strategy selects a location in a round robin fashion oblivious to the bytes used or the capacity. |leastBytesUsed| +|`druid.segmentCache.locationSelectorStrategy`|The strategy used to select a location from the configured `druid.segmentCache.locations` for segment distribution. Possible values are `leastBytesUsed` or `roundRobin`. |leastBytesUsed| |`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a process is no longer serving a segment.|true| |`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)| |`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir| @@ -1275,6 +1275,9 @@ These Historical configurations can be defined in the `historical/runtime.proper In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise. +In `druid.segmentCache.locationSelectorStrategy`, one of leastBytesUsed or roundRobin could be specified to represent the strategy to distribute segments across multiple segment cache locations. The leastBytesUsed which is the default strategy always selects a location which has least bytes used in absolute terms. The roundRobin strategy selects a location in a round robin fashion oblivious to the bytes used or the capacity. + + #### Historical Query Configs ##### Concurrent Requests diff --git a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java index ab6430068ab8..5a3693e0a103 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java @@ -19,12 +19,12 @@ package org.apache.druid.segment.loading; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import org.apache.druid.timeline.DataSegment; import java.util.Comparator; import java.util.Iterator; +import java.util.List; /** * A {@link StorageLocation} selector strategy that selects a segment cache location that is least filled each time @@ -35,17 +35,25 @@ public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLoc private static final Comparator COMPARATOR = Comparator .comparingLong(StorageLocation::currSizeBytes); - private ImmutableList storageLocations; + private List storageLocations; + + public LeastBytesUsedStorageLocationSelectorStrategy(List storageLocations) + { + this.storageLocations = storageLocations; + } @Override - public Iterator getLocations(DataSegment dataSegment, String storageDirStr) + public synchronized Iterator getLocations(DataSegment dataSegment, String storageDirStr) { return Ordering.from(COMPARATOR).sortedCopy(this.storageLocations).iterator(); } @Override - public void setStorageLocations(ImmutableList storageLocations) + public String toString() { - this.storageLocations = storageLocations; + return "LeastBytesUsedStorageLocationSelectorStrategy{" + + "comparator=" + COMPARATOR + + ", storageLocations=" + storageLocations + + '}'; } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java index 2125e213c9dd..fc632c6af0ed 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java @@ -19,11 +19,11 @@ package org.apache.druid.segment.loading; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import org.apache.druid.timeline.DataSegment; import java.util.Iterator; +import java.util.List; /** * A {@link StorageLocation} selector strategy that selects a segment cache location in a round-robin fashion each time @@ -33,11 +33,10 @@ public class RoundRobinStorageLocationSelectorStrategy implements StorageLocatio { private Iterator cyclicIterator; - @Override - public void setStorageLocations(ImmutableList storageLocations) + public RoundRobinStorageLocationSelectorStrategy(List storageLocations) { // cyclicIterator remembers the marker internally - cyclicIterator = Iterators.cycle(storageLocations); + this.cyclicIterator = Iterators.cycle(storageLocations); } @Override @@ -45,4 +44,12 @@ public Iterator getLocations(DataSegment dataSegment, String st { return cyclicIterator; } + + @Override + public String toString() + { + return "RoundRobinStorageLocationSelectorStrategy{" + + "cyclicIterator=" + cyclicIterator + + '}'; + } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 4ea10e63e286..0cd0dad88302 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -54,8 +54,7 @@ public class SegmentLoaderConfig private Integer numBootstrapThreads = null; @JsonProperty("locationSelectorStrategy") - private StorageLocationSelectorStrategy locationSelectorStrategy = - new LeastBytesUsedStorageLocationSelectorStrategy(); // default strategy if no strategy is specified in the config + private StorageLocationSelectorStrategy locationSelectorStrategy; @JsonProperty private File infoDir = null; @@ -93,24 +92,32 @@ public int getNumBootstrapThreads() return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads; } - public StorageLocationSelectorStrategy getStorageLocationSelectorStrategy() + public StorageLocationSelectorStrategy getStorageLocationSelectorStrategy(List storageLocations) { + if (locationSelectorStrategy == null) { + checkLocationConfigForNull(); + // default strategy if no strategy is specified in the config + locationSelectorStrategy = new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations); + } return locationSelectorStrategy; } public File getInfoDir() { if (infoDir == null) { - - if (locations == null || locations.size() == 0) { - throw new ISE("You have no segment cache locations defined. Please configure druid.segmentCache.locations to use one or more locations."); - } + checkLocationConfigForNull(); infoDir = new File(locations.get(0).getPath(), "info_dir"); } - return infoDir; } + private void checkLocationConfigForNull() + { + if (locations == null || locations.size() == 0) { + throw new ISE("You have no segment cache locations defined. Please configure druid.segmentCache.locations to use one or more locations."); + } + } + public int getStatusQueueMaxSize() { return statusQueueMaxSize; @@ -139,6 +146,7 @@ public String toString() "locations=" + locations + ", deleteOnRemove=" + deleteOnRemove + ", dropSegmentDelayMillis=" + dropSegmentDelayMillis + + ", locationSelectorStrategy=" + locationSelectorStrategy + ", infoDir=" + infoDir + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 073b78043cc3..7be5c638dd48 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import org.apache.commons.io.FileUtils; import org.apache.druid.guice.annotations.Json; @@ -101,8 +100,7 @@ public SegmentLoaderLocalCacheManager( ) ); } - this.strategy = config.getStorageLocationSelectorStrategy(); - this.strategy.setStorageLocations(ImmutableList.copyOf(locations)); + this.strategy = config.getStorageLocationSelectorStrategy(locations); } @Override @@ -191,8 +189,7 @@ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storage // indefinitely. File storageDir = loc.reserve(storageDirStr, segment); - - if (null != storageDir) { + if (storageDir != null) { try { loadInLocationWithStartMarker(segment, storageDir); return loc; @@ -200,9 +197,9 @@ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storage catch (SegmentLoadingException e) { try { log.makeAlert( - e, - "Failed to load segment in current location %s, try next location if any", - loc.getPath().getAbsolutePath() + e, + "Failed to load segment in current location [%s], try next location if any", + loc.getPath().getAbsolutePath() ).addData("location", loc.getPath().getAbsolutePath()).emit(); } finally { diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java index 9b4d21183b58..f3c34a088bea 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -21,14 +21,13 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.common.collect.ImmutableList; import org.apache.druid.timeline.DataSegment; import java.util.Iterator; /** * This interface describes the storage location selection strategy which is responsible for ordering the - * available multiple {@link StorageLocation}s for optimal segment distribution. + * available multiple {@link StorageLocation}s for segment distribution. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "tier", defaultImpl = LeastBytesUsedStorageLocationSelectorStrategy.class) @JsonSubTypes(value = { @@ -38,19 +37,13 @@ public interface StorageLocationSelectorStrategy { /** - * Finds the best ordering of the {@link StorageLocation}s to load the given {@link DataSegment} into according to - * the location selector strategy. This method returns an iterator instead of a single best location. The - * caller is responsible for iterating over the locations and calling {@link StorageLocation}'s reserve() method. - * This is because a single location may be problematic like failed disk or might become unwritable for whatever - * reasons. + * Finds the best ordering of the {@link StorageLocation}s to load the given {@link DataSegment} into according to + * the location selector strategy. This method returns an iterator instead of a single best location. The + * caller is responsible for iterating over the locations and calling {@link StorageLocation#reserve} + * method. This is because a single location may be problematic like failed disk or might become unwritable for + * whatever reasons. * * @return An iterator of {@link StorageLocation}s from which the callers can iterate and pick a location. */ Iterator getLocations(DataSegment dataSegment, String storageDirStr); - - /** - * Sets the storage locations list with the supplied storage locations. - * @param storageLocations storage locations list to be used. - */ - void setStorageLocations(ImmutableList storageLocations); } diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index 43f2c09623df..7d23eebd005c 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -401,18 +401,29 @@ private DataSegment dataSegmentWithInterval(String intervalStr, long size) @Test public void testSegmentDistributionUsingRoundRobinStrategy() throws Exception { - final List locations = new ArrayList<>(); + final List locationConfigs = new ArrayList<>(); final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, true); final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L, true); final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 1000000000L, true); - locations.add(locationConfig); - locations.add(locationConfig2); - locations.add(locationConfig3); + locationConfigs.add(locationConfig); + locationConfigs.add(locationConfig2); + locationConfigs.add(locationConfig3); + + List locations = new ArrayList<>(); + for (StorageLocationConfig locConfig : locationConfigs) { + locations.add( + new StorageLocation( + locConfig.getPath(), + locConfig.getMaxSize(), + locConfig.getFreeSpacePercent() + ) + ); + } manager = new SegmentLoaderLocalCacheManager( TestHelper.getTestIndexIO(), - new SegmentLoaderConfig().withLocations(locations).withStorageLocationSelectorStrategy( - new RoundRobinStorageLocationSelectorStrategy() + new SegmentLoaderConfig().withLocations(locationConfigs).withStorageLocationSelectorStrategy( + new RoundRobinStorageLocationSelectorStrategy(locations) ), jsonMapper ); From d66c25dbf0211d063f09cb88aa05b3a6b901713b Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sun, 25 Aug 2019 11:56:54 +0530 Subject: [PATCH 15/29] In checkLocationConfigForNull method, using getLocations() to check for null instead of directly referring to the locations variable so that tests overriding getLocations() method do not fail --- .../org/apache/druid/segment/loading/SegmentLoaderConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 0cd0dad88302..458184a7a37c 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -113,7 +113,7 @@ public File getInfoDir() private void checkLocationConfigForNull() { - if (locations == null || locations.size() == 0) { + if (getLocations() == null || getLocations().size() == 0) { throw new ISE("You have no segment cache locations defined. Please configure druid.segmentCache.locations to use one or more locations."); } } From 65dedb2aff630cb1ca2ee6205713e10c77d969b4 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 20 Sep 2019 00:04:39 +0530 Subject: [PATCH 16/29] Implementing review comments. Added tests for StorageLocationSelectorStrategy --- ...esUsedStorageLocationSelectorStrategy.java | 11 +- ...dRobinStorageLocationSelectorStrategy.java | 5 +- .../SegmentLoaderLocalCacheManager.java | 8 +- .../StorageLocationSelectorStrategy.java | 4 +- .../SegmentLoaderLocalCacheManagerTest.java | 34 +++--- .../StorageLocationSelectorStrategyTest.java | 107 ++++++++++++++++++ 6 files changed, 137 insertions(+), 32 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java diff --git a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java index 5a3693e0a103..80b340b098a7 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java @@ -20,7 +20,6 @@ package org.apache.druid.segment.loading; import com.google.common.collect.Ordering; -import org.apache.druid.timeline.DataSegment; import java.util.Comparator; import java.util.Iterator; @@ -32,8 +31,8 @@ */ public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy { - private static final Comparator COMPARATOR = Comparator - .comparingLong(StorageLocation::currSizeBytes); + private static final Ordering ORDERING = Ordering.from(Comparator + .comparingLong(StorageLocation::currSizeBytes)); private List storageLocations; @@ -43,16 +42,16 @@ public LeastBytesUsedStorageLocationSelectorStrategy(List stora } @Override - public synchronized Iterator getLocations(DataSegment dataSegment, String storageDirStr) + public Iterator getLocations() { - return Ordering.from(COMPARATOR).sortedCopy(this.storageLocations).iterator(); + return ORDERING.sortedCopy(this.storageLocations).iterator(); } @Override public String toString() { return "LeastBytesUsedStorageLocationSelectorStrategy{" + - "comparator=" + COMPARATOR + + "ordering=" + ORDERING + ", storageLocations=" + storageLocations + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java index fc632c6af0ed..d3b4fda20670 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java @@ -20,7 +20,6 @@ package org.apache.druid.segment.loading; import com.google.common.collect.Iterators; -import org.apache.druid.timeline.DataSegment; import java.util.Iterator; import java.util.List; @@ -40,9 +39,9 @@ public RoundRobinStorageLocationSelectorStrategy(List storageLo } @Override - public Iterator getLocations(DataSegment dataSegment, String storageDirStr) + public Iterator getLocations() { - return cyclicIterator; + return this.cyclicIterator; } @Override diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 7be5c638dd48..97f67690c27f 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -177,14 +177,14 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException * * Locations are fetched using {@link StorageLocationSelectorStrategy}. */ - private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException + private synchronized StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { - Iterator locations = strategy.getLocations(segment, storageDirStr); + Iterator locationsIterator = strategy.getLocations(); int numLocationsToTry = this.locations.size(); - while (locations.hasNext() && numLocationsToTry > 0) { + while (locationsIterator.hasNext() && numLocationsToTry > 0) { - StorageLocation loc = locations.next(); + StorageLocation loc = locationsIterator.next(); numLocationsToTry--; // This is to avoid the cyclic iterator returned from Round Robin strategy to loop // indefinitely. diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java index f3c34a088bea..d38885b8333a 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -37,7 +37,7 @@ public interface StorageLocationSelectorStrategy { /** - * Finds the best ordering of the {@link StorageLocation}s to load the given {@link DataSegment} into according to + * Finds the best ordering of the {@link StorageLocation}s to load a {@link DataSegment} according to * the location selector strategy. This method returns an iterator instead of a single best location. The * caller is responsible for iterating over the locations and calling {@link StorageLocation#reserve} * method. This is because a single location may be problematic like failed disk or might become unwritable for @@ -45,5 +45,5 @@ public interface StorageLocationSelectorStrategy * * @return An iterator of {@link StorageLocation}s from which the callers can iterate and pick a location. */ - Iterator getLocations(DataSegment dataSegment, String storageDirStr); + Iterator getLocations(); } diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index 7d23eebd005c..5b2b1a5f6024 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -379,23 +379,23 @@ private DataSegment dataSegmentWithInterval(String intervalStr) private DataSegment dataSegmentWithInterval(String intervalStr, long size) { return DataSegment.builder() - .dataSource("test_segment_loader") - .interval(Intervals.of(intervalStr)) - .loadSpec( - ImmutableMap.of( - "type", - "local", - "path", - "somewhere" - ) - ) - .version("2015-05-27T03:38:35.683Z") - .dimensions(ImmutableList.of()) - .metrics(ImmutableList.of()) - .shardSpec(NoneShardSpec.instance()) - .binaryVersion(9) - .size(size) - .build(); + .dataSource("test_segment_loader") + .interval(Intervals.of(intervalStr)) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version("2015-05-27T03:38:35.683Z") + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(NoneShardSpec.instance()) + .binaryVersion(9) + .size(size) + .build(); } @Test diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java new file mode 100644 index 000000000000..b8c1e9d36121 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class StorageLocationSelectorStrategyTest { + + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testLeastBytesUsedLocationSelectorStrategy() throws Exception { + + List storageLocations = new ArrayList<>(); + + final File localStorageFolder1 = tmpFolder.newFolder("local_storage_folder_1"); + final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder_2"); + final File localStorageFolder3 = tmpFolder.newFolder("local_storage_folder_3"); + + StorageLocation storageLocation1 = new StorageLocation(localStorageFolder1, 10000000000L, null); + storageLocations.add(storageLocation1); + storageLocations.add(new StorageLocation(localStorageFolder2,10000000000L,null)); + storageLocations.add(new StorageLocation(localStorageFolder3,10000000000L,null)); + + StorageLocationSelectorStrategy leastBytesUsedStrategy = + new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations); + + storageLocation1.reserve("tmp_loc1", "__seg1", 1024L); + + Iterator locations = leastBytesUsedStrategy.getLocations(); + + StorageLocation loc1 = locations.next(); + Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_2", + loc1.getPath().equals(localStorageFolder2)); + + StorageLocation loc2 = locations.next(); + Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_3", + loc2.getPath().equals(localStorageFolder3)); + + StorageLocation loc3 = locations.next(); + System.out.println(loc3); + Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_3", + loc3.getPath().equals(localStorageFolder1)); + + } + + @Test + public void testRoundRobinLocationSelectorStrategy() throws Exception { + + List storageLocations = new ArrayList<>(); + + final File localStorageFolder1 = tmpFolder.newFolder("local_storage_folder_1"); + final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder_2"); + final File localStorageFolder3 = tmpFolder.newFolder("local_storage_folder_3"); + + storageLocations.add(new StorageLocation(localStorageFolder1,10000000000L, null)); + storageLocations.add(new StorageLocation(localStorageFolder2,10000000000L,null)); + storageLocations.add(new StorageLocation(localStorageFolder3,10000000000L,null)); + + StorageLocationSelectorStrategy roundRobinStrategy = new RoundRobinStorageLocationSelectorStrategy(storageLocations); + + + Iterator locations = roundRobinStrategy.getLocations(); + StorageLocation loc1 = locations.next(); + Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_1", + loc1.getPath().equals(localStorageFolder1)); + + StorageLocation loc2 = locations.next(); + Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_2", + loc2.getPath().equals(localStorageFolder2)); + + StorageLocation loc3 = locations.next(); + Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_3", + loc3.getPath().equals(localStorageFolder3)); + + StorageLocation loc4 = locations.next(); + Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_1", + loc4.getPath().equals(localStorageFolder1)); + } + +} From 039683a91d35fe69066898985dbb41af39a8a2e6 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 20 Sep 2019 00:22:17 +0530 Subject: [PATCH 17/29] Checkstyle fixes --- ...esUsedStorageLocationSelectorStrategy.java | 2 +- .../StorageLocationSelectorStrategyTest.java | 38 ++++++++++--------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java index 80b340b098a7..dbf19c14d980 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java @@ -32,7 +32,7 @@ public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy { private static final Ordering ORDERING = Ordering.from(Comparator - .comparingLong(StorageLocation::currSizeBytes)); + .comparingLong(StorageLocation::currSizeBytes)); private List storageLocations; diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java index b8c1e9d36121..0b917fb0d9d7 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java @@ -29,13 +29,15 @@ import java.util.Iterator; import java.util.List; -public class StorageLocationSelectorStrategyTest { +public class StorageLocationSelectorStrategyTest +{ @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); @Test - public void testLeastBytesUsedLocationSelectorStrategy() throws Exception { + public void testLeastBytesUsedLocationSelectorStrategy() throws Exception + { List storageLocations = new ArrayList<>(); @@ -43,13 +45,14 @@ public void testLeastBytesUsedLocationSelectorStrategy() throws Exception { final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder_2"); final File localStorageFolder3 = tmpFolder.newFolder("local_storage_folder_3"); - StorageLocation storageLocation1 = new StorageLocation(localStorageFolder1, 10000000000L, null); + StorageLocation storageLocation1 = new StorageLocation(localStorageFolder1, 10000000000L, + null); storageLocations.add(storageLocation1); - storageLocations.add(new StorageLocation(localStorageFolder2,10000000000L,null)); - storageLocations.add(new StorageLocation(localStorageFolder3,10000000000L,null)); + storageLocations.add(new StorageLocation(localStorageFolder2, 10000000000L, null)); + storageLocations.add(new StorageLocation(localStorageFolder3, 10000000000L, null)); StorageLocationSelectorStrategy leastBytesUsedStrategy = - new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations); + new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations); storageLocation1.reserve("tmp_loc1", "__seg1", 1024L); @@ -57,21 +60,22 @@ public void testLeastBytesUsedLocationSelectorStrategy() throws Exception { StorageLocation loc1 = locations.next(); Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_2", - loc1.getPath().equals(localStorageFolder2)); + loc1.getPath().equals(localStorageFolder2)); StorageLocation loc2 = locations.next(); Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_3", - loc2.getPath().equals(localStorageFolder3)); + loc2.getPath().equals(localStorageFolder3)); StorageLocation loc3 = locations.next(); System.out.println(loc3); Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_3", - loc3.getPath().equals(localStorageFolder1)); + loc3.getPath().equals(localStorageFolder1)); } @Test - public void testRoundRobinLocationSelectorStrategy() throws Exception { + public void testRoundRobinLocationSelectorStrategy() throws Exception + { List storageLocations = new ArrayList<>(); @@ -79,9 +83,9 @@ public void testRoundRobinLocationSelectorStrategy() throws Exception { final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder_2"); final File localStorageFolder3 = tmpFolder.newFolder("local_storage_folder_3"); - storageLocations.add(new StorageLocation(localStorageFolder1,10000000000L, null)); - storageLocations.add(new StorageLocation(localStorageFolder2,10000000000L,null)); - storageLocations.add(new StorageLocation(localStorageFolder3,10000000000L,null)); + storageLocations.add(new StorageLocation(localStorageFolder1, 10000000000L, null)); + storageLocations.add(new StorageLocation(localStorageFolder2, 10000000000L, null)); + storageLocations.add(new StorageLocation(localStorageFolder3, 10000000000L, null)); StorageLocationSelectorStrategy roundRobinStrategy = new RoundRobinStorageLocationSelectorStrategy(storageLocations); @@ -89,19 +93,19 @@ public void testRoundRobinLocationSelectorStrategy() throws Exception { Iterator locations = roundRobinStrategy.getLocations(); StorageLocation loc1 = locations.next(); Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_1", - loc1.getPath().equals(localStorageFolder1)); + loc1.getPath().equals(localStorageFolder1)); StorageLocation loc2 = locations.next(); Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_2", - loc2.getPath().equals(localStorageFolder2)); + loc2.getPath().equals(localStorageFolder2)); StorageLocation loc3 = locations.next(); Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_3", - loc3.getPath().equals(localStorageFolder3)); + loc3.getPath().equals(localStorageFolder3)); StorageLocation loc4 = locations.next(); Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_1", - loc4.getPath().equals(localStorageFolder1)); + loc4.getPath().equals(localStorageFolder1)); } } From 25af782ec1e007ed007a51f4758bcb712a84e656 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 20 Sep 2019 00:34:16 +0530 Subject: [PATCH 18/29] Adding java doc comments for StorageLocationSelectorStrategy interface --- .../segment/loading/StorageLocationSelectorStrategy.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java index d38885b8333a..07ca0ede27b8 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -28,6 +28,12 @@ /** * This interface describes the storage location selection strategy which is responsible for ordering the * available multiple {@link StorageLocation}s for segment distribution. + * + * Only a snapshot of the locations is returned here. The implemntations currently do not handle all kinds of + * concurrency issues and accesses to the underlying storage. Please see + * https://github.com/apache/incubator-druid/pull/8038#discussion_r325520829 of PR https://github + * .com/apache/incubator-druid/pull/8038 for more details. + * */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "tier", defaultImpl = LeastBytesUsedStorageLocationSelectorStrategy.class) @JsonSubTypes(value = { From f03b71c2364834d903e921b78f2a58f32d87566a Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 20 Sep 2019 00:35:05 +0530 Subject: [PATCH 19/29] checkstyle --- .../druid/segment/loading/StorageLocationSelectorStrategy.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java index 07ca0ede27b8..1605adf9e6b7 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -33,7 +33,6 @@ * concurrency issues and accesses to the underlying storage. Please see * https://github.com/apache/incubator-druid/pull/8038#discussion_r325520829 of PR https://github * .com/apache/incubator-druid/pull/8038 for more details. - * */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "tier", defaultImpl = LeastBytesUsedStorageLocationSelectorStrategy.class) @JsonSubTypes(value = { From 5b9ab18253132acbea6f969ca399b7efa7b58df5 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 20 Sep 2019 10:07:23 +0530 Subject: [PATCH 20/29] empty commit to retrigger build From 78dae2e46dce4105ba430ae769c15f5148c83b82 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 20 Sep 2019 19:02:00 +0530 Subject: [PATCH 21/29] Empty commit From 1b81d48e5e6a6ccfba153619f24b45af6e7ffe98 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 20 Sep 2019 22:36:44 +0530 Subject: [PATCH 22/29] Adding suppressions for words leastBytesUsed and roundRobin of ../docs/configuration/index.md file --- website/.spelling | 2 ++ 1 file changed, 2 insertions(+) diff --git a/website/.spelling b/website/.spelling index 5e4553a04d87..a892bb119838 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1547,6 +1547,7 @@ java.class.path java.io.tmpdir javaOpts javaOptsArray +leastBytesUsed loadList loadqueuepeon loadspec @@ -1570,6 +1571,7 @@ queryType remoteTaskRunnerConfig rendezvousHash resultsets +roundRobin runtime.properties runtime.properties. s3 From 7c72d113448f102a0b32990ade9d011ab9e352b1 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Tue, 24 Sep 2019 21:39:44 +0530 Subject: [PATCH 23/29] Impl review comments including updating docs as suggested --- docs/configuration/index.md | 2 +- ...esUsedStorageLocationSelectorStrategy.java | 3 +- .../SegmentLoaderLocalCacheManager.java | 2 +- .../StorageLocationSelectorStrategyTest.java | 29 +++++++++---------- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index ee329929561c..69652788caec 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1297,7 +1297,7 @@ These Historical configurations can be defined in the `historical/runtime.proper In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise. -In `druid.segmentCache.locationSelectorStrategy`, one of leastBytesUsed or roundRobin could be specified to represent the strategy to distribute segments across multiple segment cache locations. The leastBytesUsed which is the default strategy always selects a location which has least bytes used in absolute terms. The roundRobin strategy selects a location in a round robin fashion oblivious to the bytes used or the capacity. +In `druid.segmentCache.locationSelectorStrategy`, one of leastBytesUsed or roundRobin could be specified to represent the strategy to distribute segments across multiple segment cache locations. The leastBytesUsed which is the default strategy always selects a location which has least bytes used in absolute terms. The roundRobin strategy selects a location in a round robin fashion oblivious to the bytes used or the capacity. Note that `if druid.segmentCache.numLoadingThreads` > 1, multiple threads can download different segments at the same time. In this case, with the leastBytesUsed strategy, historicals may select a sub-optimal storage location because each decision is based on a snapshot of the storage location status of when a segment is requested to download. #### Historical query configs diff --git a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java index dbf19c14d980..1cbcf2e5158e 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java @@ -51,8 +51,7 @@ public Iterator getLocations() public String toString() { return "LeastBytesUsedStorageLocationSelectorStrategy{" + - "ordering=" + ORDERING + - ", storageLocations=" + storageLocations + + "storageLocations=" + storageLocations + '}'; } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 97f67690c27f..b46ffe14578c 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -177,7 +177,7 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException * * Locations are fetched using {@link StorageLocationSelectorStrategy}. */ - private synchronized StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException + private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { Iterator locationsIterator = strategy.getLocations(); int numLocationsToTry = this.locations.size(); diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java index 0b917fb0d9d7..84c3704254c4 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java @@ -59,17 +59,16 @@ public void testLeastBytesUsedLocationSelectorStrategy() throws Exception Iterator locations = leastBytesUsedStrategy.getLocations(); StorageLocation loc1 = locations.next(); - Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_2", - loc1.getPath().equals(localStorageFolder2)); + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_2", + localStorageFolder2, loc1.getPath()); StorageLocation loc2 = locations.next(); - Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_3", - loc2.getPath().equals(localStorageFolder3)); + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_3", + localStorageFolder3, loc2.getPath()); StorageLocation loc3 = locations.next(); - System.out.println(loc3); - Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_3", - loc3.getPath().equals(localStorageFolder1)); + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", + localStorageFolder1, loc3.getPath()); } @@ -92,20 +91,20 @@ public void testRoundRobinLocationSelectorStrategy() throws Exception Iterator locations = roundRobinStrategy.getLocations(); StorageLocation loc1 = locations.next(); - Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_1", - loc1.getPath().equals(localStorageFolder1)); + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", + localStorageFolder1, loc1.getPath()); StorageLocation loc2 = locations.next(); - Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_2", - loc2.getPath().equals(localStorageFolder2)); + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_2", + localStorageFolder2, loc2.getPath()); StorageLocation loc3 = locations.next(); - Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_3", - loc3.getPath().equals(localStorageFolder3)); + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_3", + localStorageFolder3, loc3.getPath()); StorageLocation loc4 = locations.next(); - Assert.assertTrue("The next element of the iterator should point to path local_storage_folder_1", - loc4.getPath().equals(localStorageFolder1)); + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", + localStorageFolder1, loc4.getPath()); } } From 4cac096d08ac912e940c1a2e443c4e11bad8a7cb Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Tue, 24 Sep 2019 22:47:23 +0530 Subject: [PATCH 24/29] Removing checkLocationConfigForNull(), @NotEmpty annotation serves the purpose --- .../druid/segment/loading/SegmentLoaderConfig.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 458184a7a37c..5d3d88881169 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -95,7 +95,6 @@ public int getNumBootstrapThreads() public StorageLocationSelectorStrategy getStorageLocationSelectorStrategy(List storageLocations) { if (locationSelectorStrategy == null) { - checkLocationConfigForNull(); // default strategy if no strategy is specified in the config locationSelectorStrategy = new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations); } @@ -105,19 +104,11 @@ public StorageLocationSelectorStrategy getStorageLocationSelectorStrategy(List Date: Wed, 25 Sep 2019 23:17:41 +0530 Subject: [PATCH 25/29] Round robin iterator to keep track of the no. of iterations, impl review comments, added tests for round robin strategy --- ...esUsedStorageLocationSelectorStrategy.java | 2 +- ...dRobinStorageLocationSelectorStrategy.java | 48 +++++++++++++------ .../segment/loading/SegmentLoaderConfig.java | 1 - .../StorageLocationSelectorStrategy.java | 4 +- .../StorageLocationSelectorStrategyTest.java | 45 +++++++++++++---- 5 files changed, 75 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java index 1cbcf2e5158e..e1e94185c50a 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LeastBytesUsedStorageLocationSelectorStrategy.java @@ -42,7 +42,7 @@ public LeastBytesUsedStorageLocationSelectorStrategy(List stora } @Override - public Iterator getLocations() + public Iterator getLocations() { return ORDERING.sortedCopy(this.storageLocations).iterator(); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java index d3b4fda20670..45b436c1dfd9 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java @@ -19,10 +19,10 @@ package org.apache.druid.segment.loading; -import com.google.common.collect.Iterators; - import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicInteger; /** * A {@link StorageLocation} selector strategy that selects a segment cache location in a round-robin fashion each time @@ -30,25 +30,45 @@ */ public class RoundRobinStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy { - private Iterator cyclicIterator; + + private final List storageLocations; + private final AtomicInteger startIndex = new AtomicInteger(0); public RoundRobinStorageLocationSelectorStrategy(List storageLocations) { - // cyclicIterator remembers the marker internally - this.cyclicIterator = Iterators.cycle(storageLocations); + this.storageLocations = storageLocations; } @Override - public Iterator getLocations() + public Iterator getLocations() { - return this.cyclicIterator; - } + return new Iterator() { - @Override - public String toString() - { - return "RoundRobinStorageLocationSelectorStrategy{" + - "cyclicIterator=" + cyclicIterator + - '}'; + private final int numStorageLocations = storageLocations.size(); + private int remainingIterations = numStorageLocations; + private int i = startIndex.getAndUpdate(n -> (n + 1) % numStorageLocations); + + @Override + public boolean hasNext() + { + return remainingIterations > 0; + } + + @Override + public StorageLocation next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + remainingIterations--; + final StorageLocation nextLocation = storageLocations.get(i++); + if (i == numStorageLocations) { + i = 0; + startIndex.set(0); + } + return nextLocation; + } + }; } + } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 5d3d88881169..58eb77317e5b 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.utils.JvmUtils; import org.hibernate.validator.constraints.NotEmpty; diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java index 1605adf9e6b7..fdbd73f10bdc 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -48,7 +48,9 @@ public interface StorageLocationSelectorStrategy * method. This is because a single location may be problematic like failed disk or might become unwritable for * whatever reasons. * + * This method can be called by different threads and so should be thread-safe. + * * @return An iterator of {@link StorageLocation}s from which the callers can iterate and pick a location. */ - Iterator getLocations(); + Iterator getLocations(); } diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java index 84c3704254c4..aa7132bb2740 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java @@ -73,9 +73,31 @@ public void testLeastBytesUsedLocationSelectorStrategy() throws Exception } @Test - public void testRoundRobinLocationSelectorStrategy() throws Exception + public void testRoundRobinLocationSelectorStrategySingleLocation() throws Exception { + List storageLocations = new ArrayList<>(); + final File localStorageFolder1 = tmpFolder.newFolder("local_storage_folder_1"); + storageLocations.add(new StorageLocation(localStorageFolder1, 10000000000L, null)); + StorageLocationSelectorStrategy roundRobinStrategy = + new RoundRobinStorageLocationSelectorStrategy(storageLocations); + + Iterator locations = roundRobinStrategy.getLocations(); + + StorageLocation loc1 = locations.next(); + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", + localStorageFolder1, loc1.getPath()); + + locations = roundRobinStrategy.getLocations(); + + StorageLocation loc2 = locations.next(); + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", + localStorageFolder1, loc2.getPath()); + } + + @Test + public void testRoundRobinLocationSelectorStrategy() throws Exception + { List storageLocations = new ArrayList<>(); final File localStorageFolder1 = tmpFolder.newFolder("local_storage_folder_1"); @@ -88,23 +110,30 @@ public void testRoundRobinLocationSelectorStrategy() throws Exception StorageLocationSelectorStrategy roundRobinStrategy = new RoundRobinStorageLocationSelectorStrategy(storageLocations); + iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy); + iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy); + iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy); + iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy); + iterateLocs(localStorageFolder1, localStorageFolder2, localStorageFolder3, roundRobinStrategy); + } + + private void iterateLocs(File localStorageFolder1, File localStorageFolder2, File localStorageFolder3, + StorageLocationSelectorStrategy roundRobinStrategy) + { + Iterator locations; + locations = roundRobinStrategy.getLocations(); - Iterator locations = roundRobinStrategy.getLocations(); StorageLocation loc1 = locations.next(); Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", localStorageFolder1, loc1.getPath()); StorageLocation loc2 = locations.next(); - Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_2", + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", localStorageFolder2, loc2.getPath()); StorageLocation loc3 = locations.next(); - Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_3", - localStorageFolder3, loc3.getPath()); - - StorageLocation loc4 = locations.next(); Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", - localStorageFolder1, loc4.getPath()); + localStorageFolder3, loc3.getPath()); } } From 844d55db5947bab14872ba0b8ecd3c041d7586d2 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 27 Sep 2019 15:34:27 +0530 Subject: [PATCH 26/29] Fixing the round robin iterator --- .../RoundRobinStorageLocationSelectorStrategy.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java index 45b436c1dfd9..f80340f3e5e3 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java @@ -46,7 +46,6 @@ public Iterator getLocations() private final int numStorageLocations = storageLocations.size(); private int remainingIterations = numStorageLocations; - private int i = startIndex.getAndUpdate(n -> (n + 1) % numStorageLocations); @Override public boolean hasNext() @@ -61,11 +60,8 @@ public StorageLocation next() throw new NoSuchElementException(); } remainingIterations--; - final StorageLocation nextLocation = storageLocations.get(i++); - if (i == numStorageLocations) { - i = 0; - startIndex.set(0); - } + final StorageLocation nextLocation = + storageLocations.get(startIndex.getAndUpdate(n -> (n + 1) % numStorageLocations)); return nextLocation; } }; From 0c3518224aba28beffe6099e9b5edf615bf9eb17 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 27 Sep 2019 16:12:42 +0530 Subject: [PATCH 27/29] Removed numLocationsToTry, updated java docs --- .../loading/RoundRobinStorageLocationSelectorStrategy.java | 4 +++- .../segment/loading/SegmentLoaderLocalCacheManager.java | 5 +---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java index f80340f3e5e3..362fb8c0eca8 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/RoundRobinStorageLocationSelectorStrategy.java @@ -26,7 +26,9 @@ /** * A {@link StorageLocation} selector strategy that selects a segment cache location in a round-robin fashion each time - * among the available storage locations. + * among the available storage locations. When {@link Iterator#next()} on iterator retuned by + * {@link RoundRobinStorageLocationSelectorStrategy#getLocations()} is called the locations are returned in a round + * robin fashion even when multiple threads are in use. */ public class RoundRobinStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy { diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index b46ffe14578c..4a8ce98923f0 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -180,13 +180,10 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { Iterator locationsIterator = strategy.getLocations(); - int numLocationsToTry = this.locations.size(); - while (locationsIterator.hasNext() && numLocationsToTry > 0) { + while (locationsIterator.hasNext()) { StorageLocation loc = locationsIterator.next(); - numLocationsToTry--; // This is to avoid the cyclic iterator returned from Round Robin strategy to loop - // indefinitely. File storageDir = loc.reserve(storageDirStr, segment); if (storageDir != null) { From dc2bf9f4c77c10f4567fccf034f345153184c6ce Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Fri, 27 Sep 2019 23:06:55 +0530 Subject: [PATCH 28/29] changing property attribute value from tier to type --- .../druid/segment/loading/StorageLocationSelectorStrategy.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java index fdbd73f10bdc..6ac12c2222ba 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategy.java @@ -34,7 +34,8 @@ * https://github.com/apache/incubator-druid/pull/8038#discussion_r325520829 of PR https://github * .com/apache/incubator-druid/pull/8038 for more details. */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "tier", defaultImpl = LeastBytesUsedStorageLocationSelectorStrategy.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = + LeastBytesUsedStorageLocationSelectorStrategy.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "leastBytesUsed", value = LeastBytesUsedStorageLocationSelectorStrategy.class), @JsonSubTypes.Type(name = "roundRobin", value = RoundRobinStorageLocationSelectorStrategy.class) From 55bc6e8d0cf68fc5c0f40fce86e39630daaa8dd3 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sat, 28 Sep 2019 07:03:28 +0530 Subject: [PATCH 29/29] Fixing assert messages --- .../loading/StorageLocationSelectorStrategyTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java index aa7132bb2740..bfe79cafb7a0 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationSelectorStrategyTest.java @@ -120,19 +120,18 @@ public void testRoundRobinLocationSelectorStrategy() throws Exception private void iterateLocs(File localStorageFolder1, File localStorageFolder2, File localStorageFolder3, StorageLocationSelectorStrategy roundRobinStrategy) { - Iterator locations; - locations = roundRobinStrategy.getLocations(); + Iterator locations = roundRobinStrategy.getLocations(); StorageLocation loc1 = locations.next(); Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", localStorageFolder1, loc1.getPath()); StorageLocation loc2 = locations.next(); - Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_2", localStorageFolder2, loc2.getPath()); StorageLocation loc3 = locations.next(); - Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_1", + Assert.assertEquals("The next element of the iterator should point to path local_storage_folder_3", localStorageFolder3, loc3.getPath()); }