Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -231,7 +232,6 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup
* addSegment method.
*/
public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile)
throws IOException
{
final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
supervisorTaskId,
Expand All @@ -243,7 +243,7 @@ public void addSegment(String supervisorTaskId, String subTaskId, DataSegment se
public List<File> findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId)
{
for (StorageLocation location : shuffleDataLocations) {
final File partitionDir = getPartitionDir(location, supervisorTaskId, interval, partitionId);
final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId));
if (partitionDir.exists()) {
supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
final File[] segmentFiles = partitionDir.listFiles();
Expand Down Expand Up @@ -279,53 +279,65 @@ public static void addSegment(
String subTaskId,
DataSegment segment,
File segmentFile
) throws IOException
{
final StorageLocation location = findLocationForSegment(cyclicIterator, numLocations, segment);
final File destFile = new File(
getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()),
subTaskId
);
FileUtils.forceMkdirParent(destFile);
final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
if (copiedBytes == 0) {
throw new IOE(
"0 bytes copied after copying a segment file from [%s] to [%s]",
segmentFile.getAbsolutePath(),
destFile.getAbsolutePath()
);
}
location.addFile(destFile);
}

private static StorageLocation findLocationForSegment(
Iterator<StorageLocation> cyclicIterator,
int numLocations,
DataSegment segment
)
{
for (int i = 0; i < numLocations; i++) {
final StorageLocation location = cyclicIterator.next();
if (location.canHandle(segment)) {
return location;
final File destFile = location.reserve(
getPartitionFilePath(
supervisorTaskId,
subTaskId,
segment.getInterval(),
segment.getShardSpec().getPartitionNum()
),
segment.getId(),
segmentFile.length()
);
if (destFile != null) {
try {
FileUtils.forceMkdirParent(destFile);
final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
if (copiedBytes == 0) {
throw new IOE(
"0 bytes copied after copying a segment file from [%s] to [%s]",
segmentFile.getAbsolutePath(),
destFile.getAbsolutePath()
);
} else {
return;
}
}
catch (IOException e) {
// Only log here to try other locations as well.
log.warn(e, "Failed to write segmentFile at [%s]", destFile);
location.removeFile(segmentFile);
}
}
}
throw new ISE("Can't find location to handle segment[%s]", segment);
}

private static File getPartitionDir(
StorageLocation location,
private static String getPartitionFilePath(
String supervisorTaskId,
String subTaskId,
Interval interval,
int partitionId
)
{
return Paths.get(getPartitionDir(supervisorTaskId, interval, partitionId), subTaskId).toString();
}

private static String getPartitionDir(
String supervisorTaskId,
Interval interval,
int partitionId
)
{
return FileUtils.getFile(
location.getPath(),
return Paths.get(
supervisorTaskId,
interval.getStart().toString(),
interval.getEnd().toString(),
String.valueOf(partitionId)
);
).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.apache.druid.guice.annotations.Json;
Expand All @@ -43,8 +42,9 @@
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
private static final Comparator<StorageLocation> COMPARATOR = (left, right) ->
Longs.compare(right.available(), left.available());
private static final Comparator<StorageLocation> COMPARATOR = Comparator
.comparingLong(StorageLocation::availableSizeBytes)
.reversed();

private final IndexIO indexIO;
private final SegmentLoaderConfig config;
Expand Down Expand Up @@ -163,9 +163,7 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
if (loc == null) {
loc = loadSegmentWithRetry(segment, storageDir);
}
final File localStorageDir = new File(loc.getPath(), storageDir);
loc.addSegmentDir(localStorageDir, segment);
return localStorageDir;
return new File(loc.getPath(), storageDir);
}
finally {
unlock(segment, lock);
Expand All @@ -181,23 +179,24 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
{
for (StorageLocation loc : locations) {
if (loc.canHandle(segment)) {
File storageDir = new File(loc.getPath(), storageDirStr);

File storageDir = loc.reserve(storageDirStr, segment);
if (storageDir != null) {
try {
loadInLocationWithStartMarker(segment, storageDir);
return loc;
}
catch (SegmentLoadingException e) {
log.makeAlert(
e,
"Failed to load segment in current location %s, try next location if any",
loc.getPath().getAbsolutePath()
)
.addData("location", loc.getPath().getAbsolutePath())
.emit();

cleanupCacheFiles(loc.getPath(), storageDir);
try {
log.makeAlert(
e,
"Failed to load segment in current location [%s], try next location if any",
loc.getPath().getAbsolutePath()
).addData("location", loc.getPath().getAbsolutePath()).emit();
}
finally {
loc.removeSegmentDir(storageDir, segment);
cleanupCacheFiles(loc.getPath(), storageDir);
}
}
}
}
Expand Down Expand Up @@ -366,4 +365,10 @@ public ConcurrentHashMap<DataSegment, ReferenceCountingLock> getSegmentLocks()
{
return segmentLocks;
}

@VisibleForTesting
public List<StorageLocation> getLocations()
{
return locations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,51 @@

package org.apache.druid.segment.loading;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.util.HashSet;
import java.util.Set;

/**
* This class is a very simple logical representation of a local path. It keeps track of files stored under the
* {@link #path} via {@link #reserve}, so that the total size of stored files doesn't exceed the {@link #maxSizeBytes}
* and available space is always kept smaller than {@link #freeSpaceToKeep}.
*
* This class is thread-safe, so that multiple threads can update its state at the same time.
* One example usage is that a historical can use multiple threads to load different segments in parallel
* from deep storage.
*/
public class StorageLocation
{
private static final Logger log = new Logger(StorageLocation.class);

private final File path;
private final long maxSize;
private final long maxSizeBytes;
private final long freeSpaceToKeep;

/**
* Set of files stored under the {@link #path}.
*/
@GuardedBy("this")
private final Set<File> files = new HashSet<>();

private volatile long currSize = 0;
/**
* Current total size of files in bytes.
*/
@GuardedBy("this")
private long currSizeBytes = 0;

public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
public StorageLocation(File path, long maxSizeBytes, @Nullable Double freeSpacePercent)
{
this.path = path;
this.maxSize = maxSize;
this.maxSizeBytes = maxSizeBytes;

if (freeSpacePercent != null) {
long totalSpaceInPartition = path.getTotalSpace();
Expand All @@ -66,73 +84,86 @@ public File getPath()
return path;
}

public long getMaxSize()
{
return maxSize;
}

/**
* Add a new file to this location. The given file argument must be a file rather than directory.
* Remove a segment file from this location. The given file argument must be a file rather than directory.
*/
public synchronized void addFile(File file)
public synchronized void removeFile(File file)
{
if (file.isDirectory()) {
throw new ISE("[%s] must be a file. Use a");
}
if (files.add(file)) {
currSize += FileUtils.sizeOf(file);
if (files.remove(file)) {
currSizeBytes -= FileUtils.sizeOf(file);
} else {
log.warn("File[%s] is not found under this location[%s]", file, path);
}
}

/**
* Add a new segment dir to this location. The segment size is added to currSize.
* Remove a segment dir from this location. The segment size is subtracted from currSizeBytes.
*/
public synchronized void addSegmentDir(File segmentDir, DataSegment segment)
public synchronized void removeSegmentDir(File segmentDir, DataSegment segment)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about it a bit, it doesn't seem particularly beneficial for StorageLocation to have to be aware of DataSegment and SegmentId, since DataSegment is only used as a shorthand to get the size and SegmentId, and the SegmentId is only used for logging. It seems like reworking this might make the implementation of StorageLocation a bit more simple and allow IntermediaryDataManager and SegmentLoaderLocalCacheManager to use the same reserve and remove methods. I would consider this optional however, up to you if you want to investigate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already looked into that way. The thing is, the way to use StorageLocation is different in IntermediaryDataManager and SegmentLoaderLocalCacheManager. In IntermediaryDataManager, the compressed segment files are stored and registered in StorageLocation. In SegmentLoaderLocalCacheManager, the uncompressed segment directories are registered in StorageLocation. This led to use different size computations in removeFile(File) and removeSegmentDir(File, DataSegment). I wanted to make sure that the caller must be aware of what it's registering and call the right method.

I guess we could have an abstract StorageLocation class and its two implementations for different use cases. But I'm not sure it's super beneficial at this point because StorageLocation is still pretty simple.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, sgtm 👍

{
if (files.add(segmentDir)) {
currSize += segment.getSize();
if (files.remove(segmentDir)) {
currSizeBytes -= segment.getSize();
} else {
log.warn("SegmentDir[%s] is not found under this location[%s]", segmentDir, path);
}
}

/**
* Remove a segment file from this location. The given file argument must be a file rather than directory.
* Reserves space to store the given segment. The segment size is added to currSizeBytes.
* If it succeeds, it returns a file for the given segmentDir in this storage location. Returns null otherwise.
*/
public synchronized void removeFile(File file)
@Nullable
public synchronized File reserve(String segmentDir, DataSegment segment)
{
if (files.remove(file)) {
currSize -= FileUtils.sizeOf(file);
}
return reserve(segmentDir, segment.getId(), segment.getSize());
}

/**
* Remove a segment dir from this location. The segment size is subtracted from currSize.
* Reserves space to store the given segment.
* If it succeeds, it returns a file for the given segmentFilePathToAdd in this storage location.
* Returns null otherwise.
*/
public synchronized void removeSegmentDir(File segmentDir, DataSegment segment)
@Nullable
public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentId, long segmentSize)
{
if (files.remove(segmentDir)) {
currSize -= segment.getSize();
final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
if (files.contains(segmentFileToAdd)) {
return null;
}
if (canHandle(segmentId, segmentSize)) {
files.add(segmentFileToAdd);
currSizeBytes += segmentSize;
return segmentFileToAdd;
} else {
return null;
}
}

public boolean canHandle(DataSegment segment)
/**
* This method is only package-private to use it in unit tests. Production code must not call this method directly.
* Use {@link #reserve} instead.
*/
@VisibleForTesting
@GuardedBy("this")
boolean canHandle(SegmentId segmentId, long segmentSize)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should be annotated @GuardedBy("this"). See https://github.com/code-review-checklists/java-concurrency#guarded-by

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

{
if (available() < segment.getSize()) {
if (availableSizeBytes() < segmentSize) {
log.warn(
"Segment[%s:%,d] too large for storage[%s:%,d]. Check your druid.segmentCache.locations maxSize param",
segment.getId(), segment.getSize(), getPath(), available()
segmentId, segmentSize, getPath(), availableSizeBytes()
);
return false;
}

if (freeSpaceToKeep > 0) {
long currFreeSpace = path.getFreeSpace();
if ((freeSpaceToKeep + segment.getSize()) > currFreeSpace) {
if ((freeSpaceToKeep + segmentSize) > currFreeSpace) {
log.warn(
"Segment[%s:%,d] too large for storage[%s:%,d] to maintain suggested freeSpace[%d], current freeSpace is [%d].",
segment.getId(),
segment.getSize(),
segmentId,
segmentSize,
getPath(),
available(),
availableSizeBytes(),
freeSpaceToKeep,
currFreeSpace
);
Expand All @@ -143,8 +174,8 @@ public boolean canHandle(DataSegment segment)
return true;
}

public synchronized long available()
public synchronized long availableSizeBytes()
{
return maxSize - currSize;
return maxSizeBytes - currSizeBytes;
}
}
Loading