diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index bfd202e8835a..c47425a4ee87 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -45,6 +45,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -231,7 +232,6 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup * addSegment method. */ public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile) - throws IOException { final Iterator iterator = locationIterators.computeIfAbsent( supervisorTaskId, @@ -243,7 +243,7 @@ public void addSegment(String supervisorTaskId, String subTaskId, DataSegment se public List findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId) { for (StorageLocation location : shuffleDataLocations) { - final File partitionDir = getPartitionDir(location, supervisorTaskId, interval, partitionId); + final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId)); if (partitionDir.exists()) { supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc()); final File[] segmentFiles = partitionDir.listFiles(); @@ -279,53 +279,65 @@ public static void addSegment( String subTaskId, DataSegment segment, File segmentFile - ) throws IOException - { - final StorageLocation location = findLocationForSegment(cyclicIterator, numLocations, segment); - final File destFile = new File( - getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), - subTaskId - ); - FileUtils.forceMkdirParent(destFile); - final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); - if (copiedBytes == 0) { - throw new IOE( - "0 bytes copied after copying a segment file from [%s] to [%s]", - segmentFile.getAbsolutePath(), - destFile.getAbsolutePath() - ); - } - location.addFile(destFile); - } - - private static StorageLocation findLocationForSegment( - Iterator cyclicIterator, - int numLocations, - DataSegment segment ) { for (int i = 0; i < numLocations; i++) { final StorageLocation location = cyclicIterator.next(); - if (location.canHandle(segment)) { - return location; + final File destFile = location.reserve( + getPartitionFilePath( + supervisorTaskId, + subTaskId, + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ), + segment.getId(), + segmentFile.length() + ); + if (destFile != null) { + try { + FileUtils.forceMkdirParent(destFile); + final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); + if (copiedBytes == 0) { + throw new IOE( + "0 bytes copied after copying a segment file from [%s] to [%s]", + segmentFile.getAbsolutePath(), + destFile.getAbsolutePath() + ); + } else { + return; + } + } + catch (IOException e) { + // Only log here to try other locations as well. + log.warn(e, "Failed to write segmentFile at [%s]", destFile); + location.removeFile(segmentFile); + } } } throw new ISE("Can't find location to handle segment[%s]", segment); } - private static File getPartitionDir( - StorageLocation location, + private static String getPartitionFilePath( + String supervisorTaskId, + String subTaskId, + Interval interval, + int partitionId + ) + { + return Paths.get(getPartitionDir(supervisorTaskId, interval, partitionId), subTaskId).toString(); + } + + private static String getPartitionDir( String supervisorTaskId, Interval interval, int partitionId ) { - return FileUtils.getFile( - location.getPath(), + return Paths.get( supervisorTaskId, interval.getStart().toString(), interval.getEnd().toString(), String.valueOf(partitionId) - ); + ).toString(); } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index b25c2163f71d..e0f0f0b4ccdb 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.primitives.Longs; import com.google.inject.Inject; import org.apache.commons.io.FileUtils; import org.apache.druid.guice.annotations.Json; @@ -43,8 +42,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); - private static final Comparator COMPARATOR = (left, right) -> - Longs.compare(right.available(), left.available()); + private static final Comparator COMPARATOR = Comparator + .comparingLong(StorageLocation::availableSizeBytes) + .reversed(); private final IndexIO indexIO; private final SegmentLoaderConfig config; @@ -163,9 +163,7 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); } - final File localStorageDir = new File(loc.getPath(), storageDir); - loc.addSegmentDir(localStorageDir, segment); - return localStorageDir; + return new File(loc.getPath(), storageDir); } finally { unlock(segment, lock); @@ -181,23 +179,24 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { for (StorageLocation loc : locations) { - if (loc.canHandle(segment)) { - File storageDir = new File(loc.getPath(), storageDirStr); - + File storageDir = loc.reserve(storageDirStr, segment); + if (storageDir != null) { try { loadInLocationWithStartMarker(segment, storageDir); return loc; } catch (SegmentLoadingException e) { - log.makeAlert( - e, - "Failed to load segment in current location %s, try next location if any", - loc.getPath().getAbsolutePath() - ) - .addData("location", loc.getPath().getAbsolutePath()) - .emit(); - - cleanupCacheFiles(loc.getPath(), storageDir); + try { + log.makeAlert( + e, + "Failed to load segment in current location [%s], try next location if any", + loc.getPath().getAbsolutePath() + ).addData("location", loc.getPath().getAbsolutePath()).emit(); + } + finally { + loc.removeSegmentDir(storageDir, segment); + cleanupCacheFiles(loc.getPath(), storageDir); + } } } } @@ -366,4 +365,10 @@ public ConcurrentHashMap getSegmentLocks() { return segmentLocks; } + + @VisibleForTesting + public List getLocations() + { + return locations; + } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index 4fb4c4ea0bc7..842295ebb9fb 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -19,33 +19,51 @@ package org.apache.druid.segment.loading; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.FileUtils; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.util.HashSet; import java.util.Set; /** + * This class is a very simple logical representation of a local path. It keeps track of files stored under the + * {@link #path} via {@link #reserve}, so that the total size of stored files doesn't exceed the {@link #maxSizeBytes} + * and available space is always kept smaller than {@link #freeSpaceToKeep}. + * + * This class is thread-safe, so that multiple threads can update its state at the same time. + * One example usage is that a historical can use multiple threads to load different segments in parallel + * from deep storage. */ public class StorageLocation { private static final Logger log = new Logger(StorageLocation.class); private final File path; - private final long maxSize; + private final long maxSizeBytes; private final long freeSpaceToKeep; + + /** + * Set of files stored under the {@link #path}. + */ + @GuardedBy("this") private final Set files = new HashSet<>(); - private volatile long currSize = 0; + /** + * Current total size of files in bytes. + */ + @GuardedBy("this") + private long currSizeBytes = 0; - public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent) + public StorageLocation(File path, long maxSizeBytes, @Nullable Double freeSpacePercent) { this.path = path; - this.maxSize = maxSize; + this.maxSizeBytes = maxSizeBytes; if (freeSpacePercent != null) { long totalSpaceInPartition = path.getTotalSpace(); @@ -66,73 +84,86 @@ public File getPath() return path; } - public long getMaxSize() - { - return maxSize; - } - /** - * Add a new file to this location. The given file argument must be a file rather than directory. + * Remove a segment file from this location. The given file argument must be a file rather than directory. */ - public synchronized void addFile(File file) + public synchronized void removeFile(File file) { - if (file.isDirectory()) { - throw new ISE("[%s] must be a file. Use a"); - } - if (files.add(file)) { - currSize += FileUtils.sizeOf(file); + if (files.remove(file)) { + currSizeBytes -= FileUtils.sizeOf(file); + } else { + log.warn("File[%s] is not found under this location[%s]", file, path); } } /** - * Add a new segment dir to this location. The segment size is added to currSize. + * Remove a segment dir from this location. The segment size is subtracted from currSizeBytes. */ - public synchronized void addSegmentDir(File segmentDir, DataSegment segment) + public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) { - if (files.add(segmentDir)) { - currSize += segment.getSize(); + if (files.remove(segmentDir)) { + currSizeBytes -= segment.getSize(); + } else { + log.warn("SegmentDir[%s] is not found under this location[%s]", segmentDir, path); } } /** - * Remove a segment file from this location. The given file argument must be a file rather than directory. + * Reserves space to store the given segment. The segment size is added to currSizeBytes. + * If it succeeds, it returns a file for the given segmentDir in this storage location. Returns null otherwise. */ - public synchronized void removeFile(File file) + @Nullable + public synchronized File reserve(String segmentDir, DataSegment segment) { - if (files.remove(file)) { - currSize -= FileUtils.sizeOf(file); - } + return reserve(segmentDir, segment.getId(), segment.getSize()); } /** - * Remove a segment dir from this location. The segment size is subtracted from currSize. + * Reserves space to store the given segment. + * If it succeeds, it returns a file for the given segmentFilePathToAdd in this storage location. + * Returns null otherwise. */ - public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) + @Nullable + public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentId, long segmentSize) { - if (files.remove(segmentDir)) { - currSize -= segment.getSize(); + final File segmentFileToAdd = new File(path, segmentFilePathToAdd); + if (files.contains(segmentFileToAdd)) { + return null; + } + if (canHandle(segmentId, segmentSize)) { + files.add(segmentFileToAdd); + currSizeBytes += segmentSize; + return segmentFileToAdd; + } else { + return null; } } - public boolean canHandle(DataSegment segment) + /** + * This method is only package-private to use it in unit tests. Production code must not call this method directly. + * Use {@link #reserve} instead. + */ + @VisibleForTesting + @GuardedBy("this") + boolean canHandle(SegmentId segmentId, long segmentSize) { - if (available() < segment.getSize()) { + if (availableSizeBytes() < segmentSize) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d]. Check your druid.segmentCache.locations maxSize param", - segment.getId(), segment.getSize(), getPath(), available() + segmentId, segmentSize, getPath(), availableSizeBytes() ); return false; } if (freeSpaceToKeep > 0) { long currFreeSpace = path.getFreeSpace(); - if ((freeSpaceToKeep + segment.getSize()) > currFreeSpace) { + if ((freeSpaceToKeep + segmentSize) > currFreeSpace) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d] to maintain suggested freeSpace[%d], current freeSpace is [%d].", - segment.getId(), - segment.getSize(), + segmentId, + segmentSize, getPath(), - available(), + availableSizeBytes(), freeSpaceToKeep, currFreeSpace ); @@ -143,8 +174,8 @@ public boolean canHandle(DataSegment segment) return true; } - public synchronized long available() + public synchronized long availableSizeBytes() { - return maxSize - currSize; + return maxSizeBytes - currSizeBytes; } } diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java new file mode 100644 index 000000000000..cb0821717668 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.hamcrest.CoreMatchers; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +public class SegmentLoaderLocalCacheManagerConcurrencyTest +{ + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private final ObjectMapper jsonMapper; + private final String dataSource = "test_ds"; + private final String segmentVersion; + + private File localSegmentCacheFolder; + private SegmentLoaderLocalCacheManager manager; + private ExecutorService executorService; + + public SegmentLoaderLocalCacheManagerConcurrencyTest() + { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); + jsonMapper.setInjectableValues( + new InjectableValues.Std().addValue( + LocalDataSegmentPuller.class, + new LocalDataSegmentPuller() + ) + ); + segmentVersion = DateTimes.nowUtc().toString(); + } + + @Before + public void setUp() throws Exception + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder"); + + final List locations = new ArrayList<>(); + // Each segment has the size of 1000 bytes. This deep storage is capable of storing up to 2 segments. + final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 2000L, null); + locations.add(locationConfig); + + manager = new SegmentLoaderLocalCacheManager( + TestHelper.getTestIndexIO(), + new SegmentLoaderConfig().withLocations(locations), + jsonMapper + ); + executorService = Execs.multiThreaded(4, "segment-loader-local-cache-manager-concurrency-test-%d"); + } + + @After + public void tearDown() + { + executorService.shutdownNow(); + } + + @Test + public void testGetSegment() throws IOException, ExecutionException, InterruptedException + { + final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); + final List segmentsToLoad = new ArrayList<>(4); + + final Interval interval = Intervals.of("2019-01-01/P1D"); + for (int partitionId = 0; partitionId < 4; partitionId++) { + final String segmentPath = Paths.get( + localStorageFolder.getCanonicalPath(), + dataSource, + StringUtils.format("%s_%s", interval.getStart().toString(), interval.getEnd().toString()), + segmentVersion, + String.valueOf(partitionId) + ).toString(); + // manually create a local segment under localStorageFolder + final File localSegmentFile = new File( + localStorageFolder, + segmentPath + ); + localSegmentFile.mkdirs(); + final File indexZip = new File(localSegmentFile, "index.zip"); + indexZip.createNewFile(); + + final DataSegment segment = newSegment(interval, partitionId).withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + localSegmentFile.getAbsolutePath() + ) + ); + segmentsToLoad.add(segment); + } + + final List futures = segmentsToLoad + .stream() + .map(segment -> executorService.submit(() -> manager.getSegmentFiles(segment))) + .collect(Collectors.toList()); + + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(SegmentLoadingException.class)); + expectedException.expectMessage("Failed to load segment"); + for (Future future : futures) { + future.get(); + } + + System.out.println(manager.getLocations().get(0).availableSizeBytes()); + } + + private DataSegment newSegment(Interval interval, int partitionId) + { + return DataSegment.builder() + .dataSource(dataSource) + .interval(interval) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version(segmentVersion) + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(new NumberedShardSpec(partitionId, 0)) + .binaryVersion(9) + .size(1000L) + .build(); + } +} diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index cdfcd47ce8dc..23da42298783 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -38,18 +39,18 @@ public void testStorageLocationFreePercent() { // free space ignored only maxSize matters StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null); - Assert.assertTrue(locationPlain.canHandle(makeSegment("2012/2013", 9_000))); - Assert.assertFalse(locationPlain.canHandle(makeSegment("2012/2013", 11_000))); + Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013"), 9_000)); + Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013"), 11_000)); // enough space available maxSize is the limit StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0); - Assert.assertTrue(locationFree.canHandle(makeSegment("2012/2013", 9_000))); - Assert.assertFalse(locationFree.canHandle(makeSegment("2012/2013", 11_000))); + Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013"), 9_000)); + Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013"), 11_000)); // disk almost full percentage is the limit StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0); - Assert.assertTrue(locationFull.canHandle(makeSegment("2012/2013", 4_000))); - Assert.assertFalse(locationFull.canHandle(makeSegment("2012/2013", 6_000))); + Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013"), 4_000)); + Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013"), 6_000)); } private StorageLocation fakeLocation(long total, long free, long max, Double percent) @@ -71,34 +72,34 @@ public void testStorageLocation() final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23); - loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail -= 10; verifyLoc(expectedAvail, loc); - loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.addSegmentDir(new File("test2"), secondSegment); + loc.reserve("test2", secondSegment); expectedAvail -= 23; verifyLoc(expectedAvail, loc); - loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.removeSegmentDir(new File("/tmp/test1"), makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail += 10; verifyLoc(expectedAvail, loc); - loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.removeSegmentDir(new File("/tmp/test1"), makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.removeSegmentDir(new File("test2"), secondSegment); + loc.removeSegmentDir(new File("/tmp/test2"), secondSegment); expectedAvail += 23; verifyLoc(expectedAvail, loc); } private void verifyLoc(long maxSize, StorageLocation loc) { - Assert.assertEquals(maxSize, loc.available()); + Assert.assertEquals(maxSize, loc.availableSizeBytes()); for (int i = 0; i <= maxSize; ++i) { - Assert.assertTrue(String.valueOf(i), loc.canHandle(makeSegment("2013/2014", i))); + Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014"), i)); } } @@ -116,4 +117,9 @@ private DataSegment makeSegment(String intervalString, long size) size ); } + + private SegmentId newSegmentId(String intervalString) + { + return SegmentId.of("test", Intervals.of(intervalString), "1", 0); + } }