Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a619112
#7641 - Changing segment distribution algorithm to distribute segment…
t-sashidhar Jul 6, 2019
f23dac1
Merge branch 'master' into s3_firehose
t-sashidhar Jul 6, 2019
08b6256
Fixing indentation
t-sashidhar Jul 6, 2019
c2bd858
WIP
t-sashidhar Jul 11, 2019
d88c614
Merge branch 'master' into s3_firehose
t-sashidhar Jul 25, 2019
f9fa66f
Adding interface for location strategy selection, least bytes used st…
t-sashidhar Jul 25, 2019
e2efc73
Resolving merge conflicts. Adding comparator for sorting locations by…
t-sashidhar Jul 28, 2019
fd0c6d6
fixing code style
t-sashidhar Jul 28, 2019
449e929
Fixing test
t-sashidhar Jul 28, 2019
33db506
Adding a method visible only for testing, fixing tests
t-sashidhar Jul 29, 2019
2712aeb
1. Changing the method contract to return an iterator of locations in…
t-sashidhar Jul 30, 2019
c1e8597
fixing the conditional statement
t-sashidhar Jul 30, 2019
b999dba
Merge branch 'master' into s3_firehose
t-sashidhar Aug 4, 2019
66f00ee
Added testSegmentDistributionUsingLeastBytesUsedStrategy, fixed testS…
t-sashidhar Aug 4, 2019
d60ab71
to trigger CI build
t-sashidhar Aug 4, 2019
f60e0c8
Add documentation for the selection strategy configuration
t-sashidhar Aug 5, 2019
53c56bd
to re trigger CI build
t-sashidhar Aug 5, 2019
b985553
Merge branch 'master' into s3_firehose
t-sashidhar Aug 13, 2019
da21a3c
Merge branch 'master' of https://github.com/apache/incubator-druid in…
t-sashidhar Aug 18, 2019
e930cd7
updated docs as per review comments, made LeastBytesUsedStorageLocati…
t-sashidhar Aug 24, 2019
d7e0980
merging changes from master
t-sashidhar Aug 24, 2019
d66c25d
In checkLocationConfigForNull method, using getLocations() to check f…
t-sashidhar Aug 25, 2019
e309b68
Merge branch 'master' into s3_firehose
t-sashidhar Sep 7, 2019
65dedb2
Implementing review comments. Added tests for StorageLocationSelector…
t-sashidhar Sep 19, 2019
4ee0868
Merge branch 'master' into s3_firehose
t-sashidhar Sep 19, 2019
039683a
Checkstyle fixes
t-sashidhar Sep 19, 2019
25af782
Adding java doc comments for StorageLocationSelectorStrategy interface
t-sashidhar Sep 19, 2019
f03b71c
checkstyle
t-sashidhar Sep 19, 2019
5b9ab18
empty commit to retrigger build
t-sashidhar Sep 20, 2019
78dae2e
Empty commit
t-sashidhar Sep 20, 2019
1b81d48
Adding suppressions for words leastBytesUsed and roundRobin of ../doc…
t-sashidhar Sep 20, 2019
7c72d11
Impl review comments including updating docs as suggested
t-sashidhar Sep 24, 2019
e4ae066
Merge branch 'master' into s3_firehose
t-sashidhar Sep 24, 2019
4cac096
Removing checkLocationConfigForNull(), @NotEmpty annotation serves th…
t-sashidhar Sep 24, 2019
d0cffef
Round robin iterator to keep track of the no. of iterations, impl rev…
t-sashidhar Sep 25, 2019
844d55d
Fixing the round robin iterator
t-sashidhar Sep 27, 2019
0c35182
Removed numLocationsToTry, updated java docs
t-sashidhar Sep 27, 2019
dc2bf9f
changing property attribute value from tier to type
t-sashidhar Sep 27, 2019
55bc6e8
Fixing assert messages
t-sashidhar Sep 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,7 @@ These Historical configurations can be defined in the `historical/runtime.proper
|Property|Description|Default|
|--------|-----------|-------|
|`druid.segmentCache.locations`|Segments assigned to a Historical process are first stored on the local file system (in a disk cache) and then served by the Historical process. These locations define where that local cache resides. This value cannot be NULL or EMPTY. Here is an example `druid.segmentCache.locations=[{"path": "/mnt/druidSegments", "maxSize": 10000, "freeSpacePercent": 1.0}]`. "freeSpacePercent" is optional, if provided then enforces that much of free disk partition space while storing segments. But, it depends on File.getTotalSpace() and File.getFreeSpace() methods, so enable if only if they work for your File System.| none |
|`druid.segmentCache.locationSelectorStrategy`|The strategy used to select a location from the configured `druid.segmentCache.locations` for segment distribution. Possible values are `leastBytesUsed` or `roundRobin`. |leastBytesUsed|
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a process is no longer serving a segment.|true|
|`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)|
|`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
Expand All @@ -1296,6 +1297,8 @@ These Historical configurations can be defined in the `historical/runtime.proper

In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.

In `druid.segmentCache.locationSelectorStrategy`, one of leastBytesUsed or roundRobin could be specified to represent the strategy to distribute segments across multiple segment cache locations. The leastBytesUsed which is the default strategy always selects a location which has least bytes used in absolute terms. The roundRobin strategy selects a location in a round robin fashion oblivious to the bytes used or the capacity. Note that `if druid.segmentCache.numLoadingThreads` > 1, multiple threads can download different segments at the same time. In this case, with the leastBytesUsed strategy, historicals may select a sub-optimal storage location because each decision is based on a snapshot of the storage location status of when a segment is requested to download.

#### Historical query configs

##### Concurrent Requests
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import com.google.common.collect.Ordering;

import java.util.Comparator;
import java.util.Iterator;
import java.util.List;

/**
* A {@link StorageLocation} selector strategy that selects a segment cache location that is least filled each time
* among the available storage locations.
*/
public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy
{
private static final Ordering<StorageLocation> ORDERING = Ordering.from(Comparator
.comparingLong(StorageLocation::currSizeBytes));

private List<StorageLocation> storageLocations;

public LeastBytesUsedStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
{
this.storageLocations = storageLocations;
}

@Override
public Iterator<StorageLocation> getLocations()
{
return ORDERING.sortedCopy(this.storageLocations).iterator();
}

@Override
public String toString()
{
return "LeastBytesUsedStorageLocationSelectorStrategy{" +
"storageLocations=" + storageLocations +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A {@link StorageLocation} selector strategy that selects a segment cache location in a round-robin fashion each time
* among the available storage locations. When {@link Iterator#next()} on iterator retuned by
* {@link RoundRobinStorageLocationSelectorStrategy#getLocations()} is called the locations are returned in a round
* robin fashion even when multiple threads are in use.
*/
public class RoundRobinStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy
{

private final List<StorageLocation> storageLocations;
private final AtomicInteger startIndex = new AtomicInteger(0);

public RoundRobinStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
{
this.storageLocations = storageLocations;
}

@Override
public Iterator<StorageLocation> getLocations()
{
return new Iterator<StorageLocation>() {

private final int numStorageLocations = storageLocations.size();
private int remainingIterations = numStorageLocations;

@Override
public boolean hasNext()
{
return remainingIterations > 0;
}

@Override
public StorageLocation next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
remainingIterations--;
final StorageLocation nextLocation =
storageLocations.get(startIndex.getAndUpdate(n -> (n + 1) % numStorageLocations));
return nextLocation;
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.utils.JvmUtils;
import org.hibernate.validator.constraints.NotEmpty;

Expand Down Expand Up @@ -52,6 +52,9 @@ public class SegmentLoaderConfig
@JsonProperty("numBootstrapThreads")
private Integer numBootstrapThreads = null;

@JsonProperty("locationSelectorStrategy")
private StorageLocationSelectorStrategy locationSelectorStrategy;

@JsonProperty
private File infoDir = null;

Expand Down Expand Up @@ -88,16 +91,20 @@ public int getNumBootstrapThreads()
return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
}

public StorageLocationSelectorStrategy getStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
{
if (locationSelectorStrategy == null) {
// default strategy if no strategy is specified in the config
locationSelectorStrategy = new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations);
}
return locationSelectorStrategy;
}

public File getInfoDir()
{
if (infoDir == null) {

if (locations == null || locations.size() == 0) {
throw new ISE("You have no segment cache locations defined. Please configure druid.segmentCache.locations to use one or more locations.");
}
infoDir = new File(locations.get(0).getPath(), "info_dir");
}

return infoDir;
}

Expand All @@ -115,13 +122,21 @@ public SegmentLoaderConfig withLocations(List<StorageLocationConfig> locations)
return retVal;
}

@VisibleForTesting
SegmentLoaderConfig withStorageLocationSelectorStrategy(StorageLocationSelectorStrategy strategy)
{
this.locationSelectorStrategy = strategy;
return this;
}

@Override
public String toString()
{
return "SegmentLoaderConfig{" +
"locations=" + locations +
", deleteOnRemove=" + deleteOnRemove +
", dropSegmentDelayMillis=" + dropSegmentDelayMillis +
", locationSelectorStrategy=" + locationSelectorStrategy +
", infoDir=" + infoDir +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -42,9 +42,6 @@
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
private static final Comparator<StorageLocation> COMPARATOR = Comparator
.comparingLong(StorageLocation::availableSizeBytes)
.reversed();

private final IndexIO indexIO;
private final SegmentLoaderConfig config;
Expand Down Expand Up @@ -77,6 +74,8 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
*/
private final ConcurrentHashMap<DataSegment, ReferenceCountingLock> segmentLocks = new ConcurrentHashMap<>();

private final StorageLocationSelectorStrategy strategy;

// Note that we only create this via injection in historical and realtime nodes. Peons create these
// objects via SegmentLoaderFactory objects, so that they can store segments in task-specific
// directories rather than statically configured directories.
Expand All @@ -101,7 +100,7 @@ public SegmentLoaderLocalCacheManager(
)
);
}
locations.sort(COMPARATOR);
this.strategy = config.getStorageLocationSelectorStrategy(locations);
}

@Override
Expand Down Expand Up @@ -175,10 +174,17 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
* location may fail because of IO failure, most likely in two cases:<p>
* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p>
* 2. disk failure, druid can't read/write to this disk anymore
*
* Locations are fetched using {@link StorageLocationSelectorStrategy}.
*/
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
{
for (StorageLocation loc : locations) {
Iterator<StorageLocation> locationsIterator = strategy.getLocations();

while (locationsIterator.hasNext()) {

StorageLocation loc = locationsIterator.next();

File storageDir = loc.reserve(storageDirStr, segment);
if (storageDir != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,9 @@ public synchronized long availableSizeBytes()
{
return maxSizeBytes - currSizeBytes;
}

public synchronized long currSizeBytes()
{
return currSizeBytes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.timeline.DataSegment;

import java.util.Iterator;

/**
* This interface describes the storage location selection strategy which is responsible for ordering the
* available multiple {@link StorageLocation}s for segment distribution.
*
* Only a snapshot of the locations is returned here. The implemntations currently do not handle all kinds of
* concurrency issues and accesses to the underlying storage. Please see
* https://github.com/apache/incubator-druid/pull/8038#discussion_r325520829 of PR https://github
* .com/apache/incubator-druid/pull/8038 for more details.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl =
LeastBytesUsedStorageLocationSelectorStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "leastBytesUsed", value = LeastBytesUsedStorageLocationSelectorStrategy.class),
@JsonSubTypes.Type(name = "roundRobin", value = RoundRobinStorageLocationSelectorStrategy.class)
})
public interface StorageLocationSelectorStrategy
{
/**
* Finds the best ordering of the {@link StorageLocation}s to load a {@link DataSegment} according to
* the location selector strategy. This method returns an iterator instead of a single best location. The
* caller is responsible for iterating over the locations and calling {@link StorageLocation#reserve}
* method. This is because a single location may be problematic like failed disk or might become unwritable for
* whatever reasons.
*
* This method can be called by different threads and so should be thread-safe.
*
* @return An iterator of {@link StorageLocation}s from which the callers can iterate and pick a location.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I almost forgot. Would you please add to javadoc that this method can be called by different threads and so should be thread-safe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated javadoc.

Iterator<StorageLocation> getLocations();
}
Loading